From 694807979ec072487cd6ab51b398a4074ecbc1b2 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 8 Oct 2025 15:24:08 -0500 Subject: [PATCH 01/11] start --- datafusion/expr/src/expr.rs | 37 ++++++++++++++++-------------- datafusion/expr/src/expr_fn.rs | 2 +- datafusion/expr/src/expr_schema.rs | 22 ++++++++---------- 3 files changed, 31 insertions(+), 30 deletions(-) diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 282b3f6a0f55..a12cf268de2b 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -1370,13 +1370,16 @@ pub struct Placeholder { /// The identifier of the parameter, including the leading `$` (e.g, `"$1"` or `"$foo"`) pub id: String, /// The type the parameter will be filled in with - pub data_type: Option, + pub field: Option, } impl Placeholder { /// Create a new Placeholder expression pub fn new(id: String, data_type: Option) -> Self { - Self { id, data_type } + Self { + id, + field: data_type.map(|dt| Arc::new(Field::new("", dt, true))), + } } } @@ -2888,17 +2891,17 @@ impl HashNode for Expr { // Modifies expr if it is a placeholder with datatype of right fn rewrite_placeholder(expr: &mut Expr, other: &Expr, schema: &DFSchema) -> Result<()> { - if let Expr::Placeholder(Placeholder { id: _, data_type }) = expr { - if data_type.is_none() { - let other_dt = other.get_type(schema); - match other_dt { + if let Expr::Placeholder(Placeholder { id: _, field }) = expr { + if field.is_none() { + let other_field = other.to_field(schema); + match other_field { Err(e) => { Err(e.context(format!( "Can not find type of {other} needed to infer type of {expr}" )))?; } - Ok(dt) => { - *data_type = Some(dt); + Ok((_, other_field)) => { + *field = Some(other_field); } } }; @@ -3730,15 +3733,15 @@ mod test { let param_placeholders = vec![ Expr::Placeholder(Placeholder { id: "$1".to_string(), - data_type: None, + field: None, }), Expr::Placeholder(Placeholder { id: "$2".to_string(), - data_type: None, + field: None, }), Expr::Placeholder(Placeholder { id: "$3".to_string(), - data_type: None, + field: None, }), ]; let in_list = Expr::InList(InList { @@ -3764,8 +3767,8 @@ mod test { match expr { Expr::Placeholder(placeholder) => { assert_eq!( - placeholder.data_type, - Some(DataType::Int32), + placeholder.field.unwrap().data_type(), + &DataType::Int32, "Placeholder {} should infer Int32", placeholder.id ); @@ -3789,7 +3792,7 @@ mod test { expr: Box::new(col("name")), pattern: Box::new(Expr::Placeholder(Placeholder { id: "$1".to_string(), - data_type: None, + field: None, })), negated: false, case_insensitive: false, @@ -3802,7 +3805,7 @@ mod test { match inferred_expr { Expr::Like(like) => match *like.pattern { Expr::Placeholder(placeholder) => { - assert_eq!(placeholder.data_type, Some(DataType::Utf8)); + assert_eq!(placeholder.field.unwrap().data_type(), &DataType::Utf8); } _ => panic!("Expected Placeholder"), }, @@ -3817,8 +3820,8 @@ mod test { Expr::SimilarTo(like) => match *like.pattern { Expr::Placeholder(placeholder) => { assert_eq!( - placeholder.data_type, - Some(DataType::Utf8), + placeholder.field.unwrap().data_type(), + &DataType::Utf8, "Placeholder {} should infer Utf8", placeholder.id ); diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 4d8b94ba27ff..1c288a75e26f 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -124,7 +124,7 @@ pub fn ident(name: impl Into) -> Expr { pub fn placeholder(id: impl Into) -> Expr { Expr::Placeholder(Placeholder { id: id.into(), - data_type: None, + field: None, }) } diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index e803e3534130..81af29c2f683 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -433,18 +433,16 @@ impl ExprSchemable for Expr { .. }) => { let field = match &**expr { - Expr::Placeholder(Placeholder { data_type, .. }) => { - match &data_type { - None => schema - .data_type_and_nullable(&Column::from_name(name)) - .map(|(d, n)| Field::new(&schema_name, d.clone(), n)), - Some(dt) => Ok(Field::new( - &schema_name, - dt.clone(), - expr.nullable(schema)?, - )), - } - } + Expr::Placeholder(Placeholder { field, .. }) => match &field { + None => schema + .data_type_and_nullable(&Column::from_name(name)) + .map(|(d, n)| Field::new(&schema_name, d.clone(), n)), + Some(field) => Ok(field + .as_ref() + .clone() + .with_name(&schema_name) + .with_nullable(expr.nullable(schema)?)), + }, _ => expr.to_field(schema).map(|(_, f)| f.as_ref().clone()), }?; From 24c51fdb6d396b33b99cdb2b72a3c6a491a41044 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Wed, 8 Oct 2025 17:07:28 -0500 Subject: [PATCH 02/11] builds! --- datafusion/expr/src/expr_schema.rs | 10 +++--- datafusion/expr/src/logical_plan/plan.rs | 35 ++++++++++++++----- datafusion/proto/src/logical_plan/to_proto.rs | 12 +++++-- datafusion/sql/src/statement.rs | 6 ++-- 4 files changed, 44 insertions(+), 19 deletions(-) diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index 81af29c2f683..00708fd3411d 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -104,9 +104,9 @@ impl ExprSchemable for Expr { fn get_type(&self, schema: &dyn ExprSchema) -> Result { match self { Expr::Alias(Alias { expr, name, .. }) => match &**expr { - Expr::Placeholder(Placeholder { data_type, .. }) => match &data_type { + Expr::Placeholder(Placeholder { field, .. }) => match &field { None => schema.data_type(&Column::from_name(name)).cloned(), - Some(dt) => Ok(dt.clone()), + Some(field) => Ok(field.data_type().clone()), }, _ => expr.get_type(schema), }, @@ -211,9 +211,9 @@ impl ExprSchemable for Expr { ) .get_result_type(), Expr::Like { .. } | Expr::SimilarTo { .. } => Ok(DataType::Boolean), - Expr::Placeholder(Placeholder { data_type, .. }) => { - if let Some(dtype) = data_type { - Ok(dtype.clone()) + Expr::Placeholder(Placeholder { field, .. }) => { + if let Some(field) = field { + Ok(field.data_type().clone()) } else { // If the placeholder's type hasn't been specified, treat it as // null (unspecified placeholders generate an error during planning) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index b8200ab8a48c..ebb1df9f1126 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -51,7 +51,7 @@ use crate::{ WindowFunctionDefinition, }; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef}; use datafusion_common::cse::{NormalizeEq, Normalizeable}; use datafusion_common::format::ExplainFormat; use datafusion_common::tree_node::{ @@ -1494,24 +1494,43 @@ impl LogicalPlan { } /// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and DataTypes + /// + /// Note that this will drop any extension or field metadata attached to parameters. Use + /// [`LogicalPlan::get_parameter_fields`] to keep extension metadata. pub fn get_parameter_types( &self, ) -> Result>, DataFusionError> { - let mut param_types: HashMap> = HashMap::new(); + let mut parameter_fields = self.get_parameter_fields()?; + Ok(parameter_fields + .drain() + .map(|(name, maybe_field)| { + (name, maybe_field.map(|field| field.data_type().clone())) + }) + .collect()) + } + + /// Walk the logical plan, find any `Placeholder` tokens, and return a map of their IDs and FieldRefs + pub fn get_parameter_fields( + &self, + ) -> Result>, DataFusionError> { + let mut param_types: HashMap> = HashMap::new(); self.apply_with_subqueries(|plan| { plan.apply_expressions(|expr| { expr.apply(|expr| { - if let Expr::Placeholder(Placeholder { id, data_type }) = expr { + if let Expr::Placeholder(Placeholder { id, field }) = expr { let prev = param_types.get(id); - match (prev, data_type) { - (Some(Some(prev)), Some(dt)) => { - if prev != dt { + match (prev, field) { + (Some(Some(prev)), Some(field)) => { + // This check is possibly too strict (requires nullability and field + // metadata align perfectly, rather than compute true type equality + // when field metadata is representing an extension type) + if prev != field { plan_err!("Conflicting types for {id}")?; } } - (_, Some(dt)) => { - param_types.insert(id.clone(), Some(dt.clone())); + (_, Some(field)) => { + param_types.insert(id.clone(), Some(Arc::clone(field))); } _ => { param_types.insert(id.clone(), None); diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 1be3300008c7..179fd1a62e85 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -600,9 +600,15 @@ pub fn serialize_expr( })), } } - Expr::Placeholder(Placeholder { id, data_type }) => { - let data_type = match data_type { - Some(data_type) => Some(data_type.try_into()?), + Expr::Placeholder(Placeholder { id, field }) => { + let data_type = match field { + Some(field) => { + if !field.metadata().is_empty() { + return Err(Error::General(format!("Can't serialize placeholder with metadata ('{id}') to protobuf."))); + } + + Some(field.data_type().try_into()?) + } None => None, }; protobuf::LogicalExprNode { diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 0e868e8c2689..d1d64091c8cc 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -1998,10 +1998,10 @@ impl SqlToRel<'_, S> { )?; // Update placeholder's datatype to the type of the target column if let Expr::Placeholder(placeholder) = &mut expr { - placeholder.data_type = placeholder - .data_type + placeholder.field = placeholder + .field .take() - .or_else(|| Some(field.data_type().clone())); + .or_else(|| Some(Arc::clone(field))); } // Cast to target column type, if necessary expr.cast_to(field.data_type(), source.schema())? From 524eab1e0145a1df6351b1e9a492ca44deacec26 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 9 Oct 2025 13:20:52 -0500 Subject: [PATCH 03/11] backward compatible proto --- datafusion/proto/proto/datafusion.proto | 2 ++ datafusion/proto/src/generated/pbjson.rs | 36 +++++++++++++++++++ datafusion/proto/src/generated/prost.rs | 7 ++++ datafusion/proto/src/logical_plan/to_proto.rs | 31 +++++++--------- 4 files changed, 58 insertions(+), 18 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index ee9ac0e7902d..b62eae739f40 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -413,6 +413,8 @@ message Wildcard { message PlaceholderNode { string id = 1; datafusion_common.ArrowType data_type = 2; + optional bool nullable = 3; + map metadata = 4; } message LogicalExprList { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 29967d812000..1754a1e77784 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -18343,6 +18343,12 @@ impl serde::Serialize for PlaceholderNode { if self.data_type.is_some() { len += 1; } + if self.nullable.is_some() { + len += 1; + } + if !self.metadata.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.PlaceholderNode", len)?; if !self.id.is_empty() { struct_ser.serialize_field("id", &self.id)?; @@ -18350,6 +18356,12 @@ impl serde::Serialize for PlaceholderNode { if let Some(v) = self.data_type.as_ref() { struct_ser.serialize_field("dataType", v)?; } + if let Some(v) = self.nullable.as_ref() { + struct_ser.serialize_field("nullable", v)?; + } + if !self.metadata.is_empty() { + struct_ser.serialize_field("metadata", &self.metadata)?; + } struct_ser.end() } } @@ -18363,12 +18375,16 @@ impl<'de> serde::Deserialize<'de> for PlaceholderNode { "id", "data_type", "dataType", + "nullable", + "metadata", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { Id, DataType, + Nullable, + Metadata, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -18392,6 +18408,8 @@ impl<'de> serde::Deserialize<'de> for PlaceholderNode { match value { "id" => Ok(GeneratedField::Id), "dataType" | "data_type" => Ok(GeneratedField::DataType), + "nullable" => Ok(GeneratedField::Nullable), + "metadata" => Ok(GeneratedField::Metadata), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -18413,6 +18431,8 @@ impl<'de> serde::Deserialize<'de> for PlaceholderNode { { let mut id__ = None; let mut data_type__ = None; + let mut nullable__ = None; + let mut metadata__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Id => { @@ -18427,11 +18447,27 @@ impl<'de> serde::Deserialize<'de> for PlaceholderNode { } data_type__ = map_.next_value()?; } + GeneratedField::Nullable => { + if nullable__.is_some() { + return Err(serde::de::Error::duplicate_field("nullable")); + } + nullable__ = map_.next_value()?; + } + GeneratedField::Metadata => { + if metadata__.is_some() { + return Err(serde::de::Error::duplicate_field("metadata")); + } + metadata__ = Some( + map_.next_value::>()? + ); + } } } Ok(PlaceholderNode { id: id__.unwrap_or_default(), data_type: data_type__, + nullable: nullable__, + metadata: metadata__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index d3b5f566e98b..51ce9b4f8565 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -653,6 +653,13 @@ pub struct PlaceholderNode { pub id: ::prost::alloc::string::String, #[prost(message, optional, tag = "2")] pub data_type: ::core::option::Option, + #[prost(bool, optional, tag = "3")] + pub nullable: ::core::option::Option, + #[prost(map = "string, string", tag = "4")] + pub metadata: ::std::collections::HashMap< + ::prost::alloc::string::String, + ::prost::alloc::string::String, + >, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct LogicalExprList { diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 179fd1a62e85..8b59c97da432 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -600,24 +600,19 @@ pub fn serialize_expr( })), } } - Expr::Placeholder(Placeholder { id, field }) => { - let data_type = match field { - Some(field) => { - if !field.metadata().is_empty() { - return Err(Error::General(format!("Can't serialize placeholder with metadata ('{id}') to protobuf."))); - } - - Some(field.data_type().try_into()?) - } - None => None, - }; - protobuf::LogicalExprNode { - expr_type: Some(ExprType::Placeholder(PlaceholderNode { - id: id.clone(), - data_type, - })), - } - } + Expr::Placeholder(Placeholder { id, field }) => protobuf::LogicalExprNode { + expr_type: Some(ExprType::Placeholder(PlaceholderNode { + id: id.clone(), + data_type: match field { + Some(field) => Some(field.data_type().try_into()?), + None => None, + }, + nullable: field.as_ref().map(|f| f.is_nullable()), + metadata: field.as_ref() + .map(|f| f.metadata().clone()) + .unwrap_or(HashMap::new()), + })), + }, }; Ok(expr_node) From a2130c21e4a455e3db0ec358c753e108de227830 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 9 Oct 2025 14:00:44 -0500 Subject: [PATCH 04/11] move FieldMetadata --- datafusion/common/src/lib.rs | 1 + datafusion/common/src/metadata.rs | 250 ++++++++++++++++++ datafusion/core/tests/dataframe/mod.rs | 5 +- .../user_defined_scalar_functions.rs | 2 +- datafusion/expr/src/expr.rs | 232 +--------------- datafusion/expr/src/expr_schema.rs | 5 +- datafusion/expr/src/literal.rs | 3 +- datafusion/expr/src/logical_plan/builder.rs | 7 +- .../simplify_expressions/expr_simplifier.rs | 2 +- .../physical-expr/src/expressions/literal.rs | 2 +- datafusion/physical-expr/src/planner.rs | 5 +- datafusion/proto/src/logical_plan/to_proto.rs | 3 +- 12 files changed, 270 insertions(+), 247 deletions(-) create mode 100644 datafusion/common/src/metadata.rs diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 24ec9b7be323..89dc4956abb1 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -47,6 +47,7 @@ pub mod file_options; pub mod format; pub mod hash_utils; pub mod instant; +pub mod metadata; pub mod nested_struct; mod null_equality; pub mod parsers; diff --git a/datafusion/common/src/metadata.rs b/datafusion/common/src/metadata.rs new file mode 100644 index 000000000000..6f6f4e6adbe2 --- /dev/null +++ b/datafusion/common/src/metadata.rs @@ -0,0 +1,250 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{collections::BTreeMap, sync::Arc}; + +use arrow::datatypes::Field; +use hashbrown::HashMap; + +/// Literal metadata +/// +/// Stores metadata associated with a literal expressions +/// and is designed to be fast to `clone`. +/// +/// This structure is used to store metadata associated with a literal expression, and it +/// corresponds to the `metadata` field on [`Field`]. +/// +/// # Example: Create [`FieldMetadata`] from a [`Field`] +/// ``` +/// # use std::collections::HashMap; +/// # use datafusion_expr::expr::FieldMetadata; +/// # use arrow::datatypes::{Field, DataType}; +/// # let field = Field::new("c1", DataType::Int32, true) +/// # .with_metadata(HashMap::from([("foo".to_string(), "bar".to_string())])); +/// // Create a new `FieldMetadata` instance from a `Field` +/// let metadata = FieldMetadata::new_from_field(&field); +/// // There is also a `From` impl: +/// let metadata = FieldMetadata::from(&field); +/// ``` +/// +/// # Example: Update a [`Field`] with [`FieldMetadata`] +/// ``` +/// # use datafusion_expr::expr::FieldMetadata; +/// # use arrow::datatypes::{Field, DataType}; +/// # let field = Field::new("c1", DataType::Int32, true); +/// # let metadata = FieldMetadata::new_from_field(&field); +/// // Add any metadata from `FieldMetadata` to `Field` +/// let updated_field = metadata.add_to_field(field); +/// ``` +/// +#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)] +pub struct FieldMetadata { + /// The inner metadata of a literal expression, which is a map of string + /// keys to string values. + /// + /// Note this is not a `HashMap` because `HashMap` does not provide + /// implementations for traits like `Debug` and `Hash`. + inner: Arc>, +} + +impl Default for FieldMetadata { + fn default() -> Self { + Self::new_empty() + } +} + +impl FieldMetadata { + /// Create a new empty metadata instance. + pub fn new_empty() -> Self { + Self { + inner: Arc::new(BTreeMap::new()), + } + } + + /// Merges two optional `FieldMetadata` instances, overwriting any existing + /// keys in `m` with keys from `n` if present. + /// + /// This function is commonly used in alias operations, particularly for literals + /// with metadata. When creating an alias expression, the metadata from the original + /// expression (such as a literal) is combined with any metadata specified on the alias. + /// + /// # Arguments + /// + /// * `m` - The first metadata (typically from the original expression like a literal) + /// * `n` - The second metadata (typically from the alias definition) + /// + /// # Merge Strategy + /// + /// - If both metadata instances exist, they are merged with `n` taking precedence + /// - Keys from `n` will overwrite keys from `m` if they have the same name + /// - If only one metadata instance exists, it is returned unchanged + /// - If neither exists, `None` is returned + /// + /// # Example usage + /// ```rust + /// use datafusion_expr::expr::FieldMetadata; + /// use std::collections::BTreeMap; + /// + /// // Create metadata for a literal expression + /// let literal_metadata = Some(FieldMetadata::from(BTreeMap::from([ + /// ("source".to_string(), "constant".to_string()), + /// ("type".to_string(), "int".to_string()), + /// ]))); + /// + /// // Create metadata for an alias + /// let alias_metadata = Some(FieldMetadata::from(BTreeMap::from([ + /// ("description".to_string(), "answer".to_string()), + /// ("source".to_string(), "user".to_string()), // This will override literal's "source" + /// ]))); + /// + /// // Merge the metadata + /// let merged = FieldMetadata::merge_options( + /// literal_metadata.as_ref(), + /// alias_metadata.as_ref(), + /// ); + /// + /// // Result contains: {"source": "user", "type": "int", "description": "answer"} + /// assert!(merged.is_some()); + /// ``` + pub fn merge_options( + m: Option<&FieldMetadata>, + n: Option<&FieldMetadata>, + ) -> Option { + match (m, n) { + (Some(m), Some(n)) => { + let mut merged = m.clone(); + merged.extend(n.clone()); + Some(merged) + } + (Some(m), None) => Some(m.clone()), + (None, Some(n)) => Some(n.clone()), + (None, None) => None, + } + } + + /// Create a new metadata instance from a `Field`'s metadata. + pub fn new_from_field(field: &Field) -> Self { + let inner = field + .metadata() + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + Self { + inner: Arc::new(inner), + } + } + + /// Create a new metadata instance from a map of string keys to string values. + pub fn new(inner: BTreeMap) -> Self { + Self { + inner: Arc::new(inner), + } + } + + /// Get the inner metadata as a reference to a `BTreeMap`. + pub fn inner(&self) -> &BTreeMap { + &self.inner + } + + /// Return the inner metadata + pub fn into_inner(self) -> Arc> { + self.inner + } + + /// Adds metadata from `other` into `self`, overwriting any existing keys. + pub fn extend(&mut self, other: Self) { + if other.is_empty() { + return; + } + let other = Arc::unwrap_or_clone(other.into_inner()); + Arc::make_mut(&mut self.inner).extend(other); + } + + /// Returns true if the metadata is empty. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Returns the number of key-value pairs in the metadata. + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Convert this `FieldMetadata` into a `HashMap` + pub fn to_hashmap(&self) -> std::collections::HashMap { + self.inner + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() + } + + /// Updates the metadata on the Field with this metadata, if it is not empty. + pub fn add_to_field(&self, field: Field) -> Field { + if self.inner.is_empty() { + return field; + } + + field.with_metadata(self.to_hashmap()) + } +} + +impl From<&Field> for FieldMetadata { + fn from(field: &Field) -> Self { + Self::new_from_field(field) + } +} + +impl From> for FieldMetadata { + fn from(inner: BTreeMap) -> Self { + Self::new(inner) + } +} + +impl From> for FieldMetadata { + fn from(map: std::collections::HashMap) -> Self { + Self::new(map.into_iter().collect()) + } +} + +/// From reference +impl From<&std::collections::HashMap> for FieldMetadata { + fn from(map: &std::collections::HashMap) -> Self { + let inner = map + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + Self::new(inner) + } +} + +/// From hashbrown map +impl From> for FieldMetadata { + fn from(map: HashMap) -> Self { + let inner = map.into_iter().collect(); + Self::new(inner) + } +} + +impl From<&HashMap> for FieldMetadata { + fn from(map: &HashMap) -> Self { + let inner = map + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(); + Self::new(inner) + } +} diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index aa538f6dee81..11685b4c17ea 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -33,6 +33,7 @@ use arrow::error::ArrowError; use arrow::util::pretty::pretty_format_batches; use arrow_schema::{SortOptions, TimeUnit}; use datafusion::{assert_batches_eq, dataframe}; +use datafusion_common::metadata::FieldMetadata; use datafusion_functions_aggregate::count::{count_all, count_all_window}; use datafusion_functions_aggregate::expr_fn::{ array_agg, avg, avg_distinct, count, count_distinct, max, median, min, sum, @@ -71,9 +72,7 @@ use datafusion_common_runtime::SpawnedTask; use datafusion_datasource::file_format::format_as_file_type; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeEnv; -use datafusion_expr::expr::{ - FieldMetadata, GroupingSet, NullTreatment, Sort, WindowFunction, -}; +use datafusion_expr::expr::{GroupingSet, NullTreatment, Sort, WindowFunction}; use datafusion_expr::var_provider::{VarProvider, VarType}; use datafusion_expr::{ cast, col, create_udf, exists, in_subquery, lit, out_ref_col, placeholder, diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index f1af66de9b59..fb1371da6ceb 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -34,13 +34,13 @@ use datafusion::execution::context::{FunctionFactory, RegisterFunction, SessionS use datafusion::prelude::*; use datafusion::{execution::registry::FunctionRegistry, test_util}; use datafusion_common::cast::{as_float64_array, as_int32_array}; +use datafusion_common::metadata::FieldMetadata; use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::utils::take_function_args; use datafusion_common::{ assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_datafusion_err, exec_err, not_impl_err, plan_err, DFSchema, DataFusionError, Result, ScalarValue, }; -use datafusion_expr::expr::FieldMetadata; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo}; use datafusion_expr::{ lit_with_metadata, Accumulator, ColumnarValue, CreateFunction, CreateFunctionBody, diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index a12cf268de2b..b47f7412b121 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -18,7 +18,7 @@ //! Logical Expressions: [`Expr`] use std::cmp::Ordering; -use std::collections::{BTreeMap, HashSet}; +use std::collections::HashSet; use std::fmt::{self, Display, Formatter, Write}; use std::hash::{Hash, Hasher}; use std::mem; @@ -32,6 +32,7 @@ use crate::{ExprSchemable, Operator, Signature, WindowFrame, WindowUDF}; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::cse::{HashNode, NormalizeEq, Normalizeable}; +use datafusion_common::metadata::FieldMetadata; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeContainer, TreeNodeRecursion, }; @@ -447,235 +448,6 @@ impl<'a> TreeNodeContainer<'a, Self> for Expr { } } -/// Literal metadata -/// -/// Stores metadata associated with a literal expressions -/// and is designed to be fast to `clone`. -/// -/// This structure is used to store metadata associated with a literal expression, and it -/// corresponds to the `metadata` field on [`Field`]. -/// -/// # Example: Create [`FieldMetadata`] from a [`Field`] -/// ``` -/// # use std::collections::HashMap; -/// # use datafusion_expr::expr::FieldMetadata; -/// # use arrow::datatypes::{Field, DataType}; -/// # let field = Field::new("c1", DataType::Int32, true) -/// # .with_metadata(HashMap::from([("foo".to_string(), "bar".to_string())])); -/// // Create a new `FieldMetadata` instance from a `Field` -/// let metadata = FieldMetadata::new_from_field(&field); -/// // There is also a `From` impl: -/// let metadata = FieldMetadata::from(&field); -/// ``` -/// -/// # Example: Update a [`Field`] with [`FieldMetadata`] -/// ``` -/// # use datafusion_expr::expr::FieldMetadata; -/// # use arrow::datatypes::{Field, DataType}; -/// # let field = Field::new("c1", DataType::Int32, true); -/// # let metadata = FieldMetadata::new_from_field(&field); -/// // Add any metadata from `FieldMetadata` to `Field` -/// let updated_field = metadata.add_to_field(field); -/// ``` -/// -#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)] -pub struct FieldMetadata { - /// The inner metadata of a literal expression, which is a map of string - /// keys to string values. - /// - /// Note this is not a `HashMap` because `HashMap` does not provide - /// implementations for traits like `Debug` and `Hash`. - inner: Arc>, -} - -impl Default for FieldMetadata { - fn default() -> Self { - Self::new_empty() - } -} - -impl FieldMetadata { - /// Create a new empty metadata instance. - pub fn new_empty() -> Self { - Self { - inner: Arc::new(BTreeMap::new()), - } - } - - /// Merges two optional `FieldMetadata` instances, overwriting any existing - /// keys in `m` with keys from `n` if present. - /// - /// This function is commonly used in alias operations, particularly for literals - /// with metadata. When creating an alias expression, the metadata from the original - /// expression (such as a literal) is combined with any metadata specified on the alias. - /// - /// # Arguments - /// - /// * `m` - The first metadata (typically from the original expression like a literal) - /// * `n` - The second metadata (typically from the alias definition) - /// - /// # Merge Strategy - /// - /// - If both metadata instances exist, they are merged with `n` taking precedence - /// - Keys from `n` will overwrite keys from `m` if they have the same name - /// - If only one metadata instance exists, it is returned unchanged - /// - If neither exists, `None` is returned - /// - /// # Example usage - /// ```rust - /// use datafusion_expr::expr::FieldMetadata; - /// use std::collections::BTreeMap; - /// - /// // Create metadata for a literal expression - /// let literal_metadata = Some(FieldMetadata::from(BTreeMap::from([ - /// ("source".to_string(), "constant".to_string()), - /// ("type".to_string(), "int".to_string()), - /// ]))); - /// - /// // Create metadata for an alias - /// let alias_metadata = Some(FieldMetadata::from(BTreeMap::from([ - /// ("description".to_string(), "answer".to_string()), - /// ("source".to_string(), "user".to_string()), // This will override literal's "source" - /// ]))); - /// - /// // Merge the metadata - /// let merged = FieldMetadata::merge_options( - /// literal_metadata.as_ref(), - /// alias_metadata.as_ref(), - /// ); - /// - /// // Result contains: {"source": "user", "type": "int", "description": "answer"} - /// assert!(merged.is_some()); - /// ``` - pub fn merge_options( - m: Option<&FieldMetadata>, - n: Option<&FieldMetadata>, - ) -> Option { - match (m, n) { - (Some(m), Some(n)) => { - let mut merged = m.clone(); - merged.extend(n.clone()); - Some(merged) - } - (Some(m), None) => Some(m.clone()), - (None, Some(n)) => Some(n.clone()), - (None, None) => None, - } - } - - /// Create a new metadata instance from a `Field`'s metadata. - pub fn new_from_field(field: &Field) -> Self { - let inner = field - .metadata() - .iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect(); - Self { - inner: Arc::new(inner), - } - } - - /// Create a new metadata instance from a map of string keys to string values. - pub fn new(inner: BTreeMap) -> Self { - Self { - inner: Arc::new(inner), - } - } - - /// Get the inner metadata as a reference to a `BTreeMap`. - pub fn inner(&self) -> &BTreeMap { - &self.inner - } - - /// Return the inner metadata - pub fn into_inner(self) -> Arc> { - self.inner - } - - /// Adds metadata from `other` into `self`, overwriting any existing keys. - pub fn extend(&mut self, other: Self) { - if other.is_empty() { - return; - } - let other = Arc::unwrap_or_clone(other.into_inner()); - Arc::make_mut(&mut self.inner).extend(other); - } - - /// Returns true if the metadata is empty. - pub fn is_empty(&self) -> bool { - self.inner.is_empty() - } - - /// Returns the number of key-value pairs in the metadata. - pub fn len(&self) -> usize { - self.inner.len() - } - - /// Convert this `FieldMetadata` into a `HashMap` - pub fn to_hashmap(&self) -> std::collections::HashMap { - self.inner - .iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect() - } - - /// Updates the metadata on the Field with this metadata, if it is not empty. - pub fn add_to_field(&self, field: Field) -> Field { - if self.inner.is_empty() { - return field; - } - - field.with_metadata(self.to_hashmap()) - } -} - -impl From<&Field> for FieldMetadata { - fn from(field: &Field) -> Self { - Self::new_from_field(field) - } -} - -impl From> for FieldMetadata { - fn from(inner: BTreeMap) -> Self { - Self::new(inner) - } -} - -impl From> for FieldMetadata { - fn from(map: std::collections::HashMap) -> Self { - Self::new(map.into_iter().collect()) - } -} - -/// From reference -impl From<&std::collections::HashMap> for FieldMetadata { - fn from(map: &std::collections::HashMap) -> Self { - let inner = map - .iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect(); - Self::new(inner) - } -} - -/// From hashbrown map -impl From> for FieldMetadata { - fn from(map: HashMap) -> Self { - let inner = map.into_iter().collect(); - Self::new(inner) - } -} - -impl From<&HashMap> for FieldMetadata { - fn from(map: &HashMap) -> Self { - let inner = map - .into_iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect(); - Self::new(inner) - } -} - /// The metadata used in [`Field::metadata`]. /// /// This represents the metadata associated with an Arrow [`Field`]. The metadata consists of key-value pairs. diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index 00708fd3411d..94b855addcaa 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -17,8 +17,8 @@ use super::{Between, Expr, Like}; use crate::expr::{ - AggregateFunction, AggregateFunctionParams, Alias, BinaryExpr, Cast, FieldMetadata, - InList, InSubquery, Placeholder, ScalarFunction, TryCast, Unnest, WindowFunction, + AggregateFunction, AggregateFunctionParams, Alias, BinaryExpr, Cast, InList, + InSubquery, Placeholder, ScalarFunction, TryCast, Unnest, WindowFunction, WindowFunctionParams, }; use crate::type_coercion::functions::{ @@ -28,6 +28,7 @@ use crate::udf::ReturnFieldArgs; use crate::{utils, LogicalPlan, Projection, Subquery, WindowFunctionDefinition}; use arrow::compute::can_cast_types; use arrow::datatypes::{DataType, Field, FieldRef}; +use datafusion_common::metadata::FieldMetadata; use datafusion_common::{ not_impl_err, plan_datafusion_err, plan_err, Column, DataFusionError, ExprSchema, Result, Spans, TableReference, diff --git a/datafusion/expr/src/literal.rs b/datafusion/expr/src/literal.rs index c4bd43bc0a62..335d7b471f5f 100644 --- a/datafusion/expr/src/literal.rs +++ b/datafusion/expr/src/literal.rs @@ -17,9 +17,8 @@ //! Literal module contains foundational types that are used to represent literals in DataFusion. -use crate::expr::FieldMetadata; use crate::Expr; -use datafusion_common::ScalarValue; +use datafusion_common::{metadata::FieldMetadata, ScalarValue}; /// Create a literal expression pub fn lit(n: T) -> Expr { diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 7a283b0420d3..c04b9c015615 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -25,7 +25,7 @@ use std::iter::once; use std::sync::Arc; use crate::dml::CopyTo; -use crate::expr::{Alias, FieldMetadata, PlannedReplaceSelectItem, Sort as SortExpr}; +use crate::expr::{Alias, PlannedReplaceSelectItem, Sort as SortExpr}; use crate::expr_rewriter::{ coerce_plan_expr_for_schema, normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_cols, normalize_sorts, @@ -53,6 +53,7 @@ use arrow::compute::can_cast_types; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::file_options::file_type::FileType; +use datafusion_common::metadata::FieldMetadata; use datafusion_common::{ exec_err, get_target_functional_dependencies, internal_datafusion_err, not_impl_err, plan_datafusion_err, plan_err, Column, Constraints, DFSchema, DFSchemaRef, @@ -2708,12 +2709,12 @@ mod tests { assert_snapshot!(plan, @r" Union - Cross Join: + Cross Join: SubqueryAlias: left Values: (Int32(1)) SubqueryAlias: right Values: (Int32(1)) - Cross Join: + Cross Join: SubqueryAlias: left Values: (Int32(1)) SubqueryAlias: right diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index 4710ece24f91..07a614340e6c 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -29,6 +29,7 @@ use std::sync::Arc; use datafusion_common::{ cast::{as_large_list_array, as_list_array}, + metadata::FieldMetadata, tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRewriter}, }; use datafusion_common::{ @@ -57,7 +58,6 @@ use crate::simplify_expressions::unwrap_cast::{ unwrap_cast_in_comparison_for_binary, }; use crate::simplify_expressions::SimplifyInfo; -use datafusion_expr::expr::FieldMetadata; use datafusion_expr_common::casts::try_cast_literal_to_type; use indexmap::IndexSet; use regex::Regex; diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index 6e425ee439d6..94e91d43a1c4 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -28,8 +28,8 @@ use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; +use datafusion_common::metadata::FieldMetadata; use datafusion_common::{Result, ScalarValue}; -use datafusion_expr::expr::FieldMetadata; use datafusion_expr::Expr; use datafusion_expr_common::columnar_value::ColumnarValue; use datafusion_expr_common::interval_arithmetic::Interval; diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index 73df60c42e96..7790380dffd5 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -25,13 +25,12 @@ use crate::{ use arrow::datatypes::Schema; use datafusion_common::config::ConfigOptions; +use datafusion_common::metadata::FieldMetadata; use datafusion_common::{ exec_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, ToDFSchema, }; use datafusion_expr::execution_props::ExecutionProps; -use datafusion_expr::expr::{ - Alias, Cast, FieldMetadata, InList, Placeholder, ScalarFunction, -}; +use datafusion_expr::expr::{Alias, Cast, InList, Placeholder, ScalarFunction}; use datafusion_expr::var_provider::is_system_variables; use datafusion_expr::var_provider::VarType; use datafusion_expr::{ diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 8b59c97da432..c912da66cef7 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -608,7 +608,8 @@ pub fn serialize_expr( None => None, }, nullable: field.as_ref().map(|f| f.is_nullable()), - metadata: field.as_ref() + metadata: field + .as_ref() .map(|f| f.metadata().clone()) .unwrap_or(HashMap::new()), })), From e764884356033cfad40d36d01dbbafc33428e58e Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 9 Oct 2025 14:18:53 -0500 Subject: [PATCH 05/11] maybe working from proto --- datafusion/expr/src/expr.rs | 5 +++++ .../proto/src/logical_plan/from_proto.rs | 22 ++++++++++++++----- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index b47f7412b121..9ee058e1a08b 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -1153,6 +1153,11 @@ impl Placeholder { field: data_type.map(|dt| Arc::new(Field::new("", dt, true))), } } + + /// Create a new Placeholder expression from a Field + pub fn new_with_metadata(id: String, field: Option) -> Self { + Self { id, field } + } } /// Grouping sets diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index cbfa15183b5c..ae395afaeb23 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -17,6 +17,7 @@ use std::sync::Arc; +use arrow::datatypes::Field; use datafusion::execution::registry::FunctionRegistry; use datafusion_common::{ exec_datafusion_err, internal_err, plan_datafusion_err, NullEquality, @@ -595,12 +596,23 @@ pub fn parse_expr( ExprType::Rollup(RollupNode { expr }) => Ok(Expr::GroupingSet( GroupingSet::Rollup(parse_exprs(expr, registry, codec)?), )), - ExprType::Placeholder(PlaceholderNode { id, data_type }) => match data_type { + ExprType::Placeholder(PlaceholderNode { + id, + data_type, + nullable, + metadata, + }) => match data_type { None => Ok(Expr::Placeholder(Placeholder::new(id.clone(), None))), - Some(data_type) => Ok(Expr::Placeholder(Placeholder::new( - id.clone(), - Some(data_type.try_into()?), - ))), + Some(data_type) => { + // Foofy + let field = + Field::new("", data_type.try_into()?, nullable.unwrap_or(true)) + .with_metadata(metadata.clone()); + Ok(Expr::Placeholder(Placeholder::new_with_metadata( + id.clone(), + Some(field.into()), + ))) + } }, } } From 02940ff531d21a1378dbdf23f5c3c552f094363e Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 9 Oct 2025 15:29:08 -0500 Subject: [PATCH 06/11] oof --- datafusion/common/src/param_value.rs | 45 ++++++++++++++----- datafusion/core/src/execution/context/mod.rs | 11 +++-- .../core/src/execution/session_state.rs | 11 ++--- datafusion/core/tests/dataframe/mod.rs | 4 +- datafusion/expr/src/logical_plan/builder.rs | 4 +- datafusion/expr/src/logical_plan/plan.rs | 2 +- datafusion/expr/src/logical_plan/statement.rs | 4 +- datafusion/sql/src/expr/mod.rs | 11 +++-- datafusion/sql/src/expr/value.rs | 8 ++-- datafusion/sql/src/planner.rs | 45 ++++++++++--------- datafusion/sql/src/statement.rs | 17 +++---- 11 files changed, 97 insertions(+), 65 deletions(-) diff --git a/datafusion/common/src/param_value.rs b/datafusion/common/src/param_value.rs index 7582cff56f87..8cfdff6d6e1d 100644 --- a/datafusion/common/src/param_value.rs +++ b/datafusion/common/src/param_value.rs @@ -16,22 +16,23 @@ // under the License. use crate::error::{_plan_datafusion_err, _plan_err}; +use crate::metadata::FieldMetadata; use crate::{Result, ScalarValue}; -use arrow::datatypes::DataType; +use arrow::datatypes::FieldRef; use std::collections::HashMap; /// The parameter value corresponding to the placeholder #[derive(Debug, Clone)] pub enum ParamValues { /// For positional query parameters, like `SELECT * FROM test WHERE a > $1 AND b = $2` - List(Vec), + List(Vec<(ScalarValue, Option)>), /// For named query parameters, like `SELECT * FROM test WHERE a > $foo AND b = $goo` - Map(HashMap), + Map(HashMap)>), } impl ParamValues { /// Verify parameter list length and type - pub fn verify(&self, expect: &[DataType]) -> Result<()> { + pub fn verify(&self, expect: &[FieldRef]) -> Result<()> { match self { ParamValues::List(list) => { // Verify if the number of params matches the number of values @@ -45,8 +46,8 @@ impl ParamValues { // Verify if the types of the params matches the types of the values let iter = expect.iter().zip(list.iter()); - for (i, (param_type, value)) in iter.enumerate() { - if *param_type != value.data_type() { + for (i, (param_type, (value, maybe_metadata))) in iter.enumerate() { + if *param_type.data_type() != value.data_type() { return _plan_err!( "Expected parameter of type {}, got {:?} at index {}", param_type, @@ -54,6 +55,19 @@ impl ParamValues { i ); } + + if let Some(expected_metadata) = maybe_metadata { + // Probably too strict of a comparison (this is an example of where + // the concept of type equality would be useful) + if &expected_metadata.to_hashmap() != param_type.metadata() { + return _plan_err!( + "Expected parameter with metadata {:?}, got {:?} at index {}", + expected_metadata, + param_type.metadata(), + i + ); + } + } } Ok(()) } @@ -65,7 +79,10 @@ impl ParamValues { } } - pub fn get_placeholders_with_values(&self, id: &str) -> Result { + pub fn get_placeholders_with_values( + &self, + id: &str, + ) -> Result<(ScalarValue, Option)> { match self { ParamValues::List(list) => { if id.is_empty() { @@ -99,7 +116,7 @@ impl ParamValues { impl From> for ParamValues { fn from(value: Vec) -> Self { - Self::List(value) + Self::List(value.into_iter().map(|v| (v, None)).collect()) } } @@ -108,8 +125,10 @@ where K: Into, { fn from(value: Vec<(K, ScalarValue)>) -> Self { - let value: HashMap = - value.into_iter().map(|(k, v)| (k.into(), v)).collect(); + let value: HashMap)> = value + .into_iter() + .map(|(k, v)| (k.into(), (v, None))) + .collect(); Self::Map(value) } } @@ -119,8 +138,10 @@ where K: Into, { fn from(value: HashMap) -> Self { - let value: HashMap = - value.into_iter().map(|(k, v)| (k.into(), v)).collect(); + let value: HashMap)> = value + .into_iter() + .map(|(k, v)| (k.into(), (v, None))) + .collect(); Self::Map(value) } } diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index a8148b80495e..f04b9a9ce0bb 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -64,6 +64,7 @@ use datafusion_catalog::{ DynamicFileCatalog, TableFunction, TableFunctionImpl, UrlTableFactory, }; use datafusion_common::config::ConfigOptions; +use datafusion_common::metadata::FieldMetadata; use datafusion_common::{ config::{ConfigExtension, TableOptions}, exec_datafusion_err, exec_err, internal_datafusion_err, not_impl_err, @@ -1238,10 +1239,10 @@ impl SessionContext { })?; // Only allow literals as parameters for now. - let mut params: Vec = parameters + let mut params: Vec<(ScalarValue, Option)> = parameters .into_iter() .map(|e| match e { - Expr::Literal(scalar, _) => Ok(scalar), + Expr::Literal(scalar, metadata) => Ok((scalar, metadata)), _ => not_impl_err!("Unsupported parameter type: {}", e), }) .collect::>()?; @@ -1259,7 +1260,11 @@ impl SessionContext { params = params .into_iter() .zip(prepared.data_types.iter()) - .map(|(e, dt)| e.cast_to(dt)) + .map(|(e, dt)| -> Result<_> { + // This is fishy...we're casting storage without checking if an + // extension type supports the destination + Ok((e.0.cast_to(dt.data_type())?, e.1)) + }) .collect::>()?; } diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index b04004dd495c..9da3bfb29d4e 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -30,6 +30,7 @@ use crate::datasource::provider_as_source; use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner}; use crate::execution::SessionStateDefaults; use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; +use arrow_schema::FieldRef; use datafusion_catalog::information_schema::{ InformationSchemaProvider, INFORMATION_SCHEMA, }; @@ -116,11 +117,11 @@ use uuid::Uuid; /// # #[tokio::main] /// # async fn main() -> Result<()> { /// let state = SessionStateBuilder::new() -/// .with_config(SessionConfig::new()) +/// .with_config(SessionConfig::new()) /// .with_runtime_env(Arc::new(RuntimeEnv::default())) /// .with_default_features() /// .build(); -/// Ok(()) +/// Ok(()) /// # } /// ``` /// @@ -873,7 +874,7 @@ impl SessionState { pub(crate) fn store_prepared( &mut self, name: String, - data_types: Vec, + data_types: Vec, plan: Arc, ) -> datafusion_common::Result<()> { match self.prepared_plans.entry(name) { @@ -1323,7 +1324,7 @@ impl SessionStateBuilder { /// let url = Url::try_from("file://").unwrap(); /// let object_store = object_store::local::LocalFileSystem::new(); /// let state = SessionStateBuilder::new() - /// .with_config(SessionConfig::new()) + /// .with_config(SessionConfig::new()) /// .with_object_store(&url, Arc::new(object_store)) /// .with_default_features() /// .build(); @@ -2012,7 +2013,7 @@ impl SimplifyInfo for SessionSimplifyProvider<'_> { #[derive(Debug)] pub(crate) struct PreparedPlan { /// Data types of the parameters - pub(crate) data_types: Vec, + pub(crate) data_types: Vec, /// The prepared logical plan pub(crate) plan: Arc, } diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 11685b4c17ea..979ada2bc6bb 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -66,7 +66,7 @@ use datafusion_catalog::TableProvider; use datafusion_common::test_util::{batches_to_sort_string, batches_to_string}; use datafusion_common::{ assert_contains, internal_datafusion_err, Constraint, Constraints, DFSchema, - DataFusionError, ParamValues, ScalarValue, TableReference, UnnestOptions, + DataFusionError, ScalarValue, TableReference, UnnestOptions, }; use datafusion_common_runtime::SpawnedTask; use datafusion_datasource::file_format::format_as_file_type; @@ -2464,7 +2464,7 @@ async fn filtered_aggr_with_param_values() -> Result<()> { let df = ctx .sql("select count (c2) filter (where c3 > $1) from table1") .await? - .with_param_values(ParamValues::List(vec![ScalarValue::from(10u64)])); + .with_param_values(vec![ScalarValue::from(10u64)]); let df_results = df?.collect().await?; assert_snapshot!( diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index c04b9c015615..e291349c7a66 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -50,7 +50,7 @@ use crate::{ use super::dml::InsertOp; use arrow::compute::can_cast_types; -use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; +use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef}; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::file_options::file_type::FileType; use datafusion_common::metadata::FieldMetadata; @@ -623,7 +623,7 @@ impl LogicalPlanBuilder { } /// Make a builder for a prepare logical plan from the builder's plan - pub fn prepare(self, name: String, data_types: Vec) -> Result { + pub fn prepare(self, name: String, data_types: Vec) -> Result { Ok(Self::new(LogicalPlan::Statement(Statement::Prepare( Prepare { name, diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index ebb1df9f1126..8369f0325d97 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1464,7 +1464,7 @@ impl LogicalPlan { let transformed_expr = e.transform_up(|e| { if let Expr::Placeholder(Placeholder { id, .. }) = e { let value = param_values.get_placeholders_with_values(&id)?; - Ok(Transformed::yes(Expr::Literal(value, None))) + Ok(Transformed::yes(Expr::Literal(value.0, value.1))) } else { Ok(Transformed::no(e)) } diff --git a/datafusion/expr/src/logical_plan/statement.rs b/datafusion/expr/src/logical_plan/statement.rs index 6d3fe9fa75ac..6cff6b180423 100644 --- a/datafusion/expr/src/logical_plan/statement.rs +++ b/datafusion/expr/src/logical_plan/statement.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use arrow::datatypes::DataType; +use arrow::datatypes::FieldRef; use datafusion_common::{DFSchema, DFSchemaRef}; use itertools::Itertools as _; use std::fmt::{self, Display}; @@ -192,7 +192,7 @@ pub struct Prepare { /// The name of the statement pub name: String, /// Data types of the parameters ([`Expr::Placeholder`]) - pub data_types: Vec, + pub data_types: Vec, /// The logical plan of the statements pub input: Arc, } diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index 23426701409e..d74415faccf4 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -287,7 +287,7 @@ impl SqlToRel<'_, S> { schema, planner_context, )?), - self.convert_data_type(&data_type)?, + self.convert_data_type(&data_type)?.data_type().clone(), ))) } @@ -297,7 +297,7 @@ impl SqlToRel<'_, S> { uses_odbc_syntax: _, }) => Ok(Expr::Cast(Cast::new( Box::new(lit(value.into_string().unwrap())), - self.convert_data_type(&data_type)?, + self.convert_data_type(&data_type)?.data_type().clone(), ))), SQLExpr::IsNull(expr) => Ok(Expr::IsNull(Box::new( @@ -974,7 +974,7 @@ impl SqlToRel<'_, S> { // numeric constants are treated as seconds (rather as nanoseconds) // to align with postgres / duckdb semantics - let expr = match &dt { + let expr = match dt.data_type() { DataType::Timestamp(TimeUnit::Nanosecond, tz) if expr.get_type(schema)? == DataType::Int64 => { @@ -986,7 +986,10 @@ impl SqlToRel<'_, S> { _ => expr, }; - Ok(Expr::Cast(Cast::new(Box::new(expr), dt))) + Ok(Expr::Cast(Cast::new( + Box::new(expr), + dt.data_type().clone(), + ))) } /// Extracts the root expression and access chain from a compound expression. diff --git a/datafusion/sql/src/expr/value.rs b/datafusion/sql/src/expr/value.rs index 7075a1afd9dd..d71f3e3f9ea9 100644 --- a/datafusion/sql/src/expr/value.rs +++ b/datafusion/sql/src/expr/value.rs @@ -20,7 +20,7 @@ use arrow::compute::kernels::cast_utils::{ parse_interval_month_day_nano_config, IntervalParseConfig, IntervalUnit, }; use arrow::datatypes::{ - i256, DataType, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, + i256, FieldRef, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, }; use bigdecimal::num_bigint::BigInt; use bigdecimal::{BigDecimal, Signed, ToPrimitive}; @@ -45,7 +45,7 @@ impl SqlToRel<'_, S> { pub(crate) fn parse_value( &self, value: Value, - param_data_types: &[DataType], + param_data_types: &[FieldRef], ) -> Result { match value { Value::Number(n, _) => self.parse_sql_number(&n, false), @@ -108,7 +108,7 @@ impl SqlToRel<'_, S> { /// number 1, 2, ... etc. For example, `$1` is the first placeholder; $2 is the second one and so on. fn create_placeholder_expr( param: String, - param_data_types: &[DataType], + param_data_types: &[FieldRef], ) -> Result { // Parse the placeholder as a number because it is the only support from sqlparser and postgres let index = param[1..].parse::(); @@ -133,7 +133,7 @@ impl SqlToRel<'_, S> { // Data type of the parameter debug!("type of param {param} param_data_types[idx]: {param_type:?}"); - Ok(Expr::Placeholder(Placeholder::new( + Ok(Expr::Placeholder(Placeholder::new_with_metadata( param, param_type.cloned(), ))) diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index e93c5e066b66..a233e0f1fdb1 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -256,7 +256,7 @@ impl IdentNormalizer { pub struct PlannerContext { /// Data types for numbered parameters ($1, $2, etc), if supplied /// in `PREPARE` statement - prepare_param_data_types: Arc>, + prepare_param_data_types: Arc>, /// Map of CTE name to logical plan of the WITH clause. /// Use `Arc` to allow cheap cloning ctes: HashMap>, @@ -290,7 +290,7 @@ impl PlannerContext { /// Update the PlannerContext with provided prepare_param_data_types pub fn with_prepare_param_data_types( mut self, - prepare_param_data_types: Vec, + prepare_param_data_types: Vec, ) -> Self { self.prepare_param_data_types = prepare_param_data_types.into(); self @@ -347,7 +347,7 @@ impl PlannerContext { } /// Return the types of parameters (`$1`, `$2`, etc) if known - pub fn prepare_param_data_types(&self) -> &[DataType] { + pub fn prepare_param_data_types(&self) -> &[FieldRef] { &self.prepare_param_data_types } @@ -433,11 +433,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .options .iter() .any(|x| x.option == ColumnOption::NotNull); - fields.push(Field::new( - self.ident_normalizer.normalize(column.name), - data_type, - !not_nullable, - )); + fields.push( + data_type + .as_ref() + .clone() + .with_name(self.ident_normalizer.normalize(column.name)) + .with_nullable(!not_nullable), + ); } Ok(Schema::new(fields)) @@ -587,11 +589,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }) } - pub(crate) fn convert_data_type(&self, sql_type: &SQLDataType) -> Result { + pub(crate) fn convert_data_type(&self, sql_type: &SQLDataType) -> Result { // First check if any of the registered type_planner can handle this type if let Some(type_planner) = self.context_provider.get_type_planner() { if let Some(data_type) = type_planner.plan_type(sql_type)? { - return Ok(data_type); + return Ok(Field::new("", data_type, true).into()); } } @@ -600,7 +602,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { SQLDataType::Array(ArrayElemTypeDef::AngleBracket(inner_sql_type)) => { // Arrays may be multi-dimensional. let inner_data_type = self.convert_data_type(inner_sql_type)?; - Ok(DataType::new_list(inner_data_type, true)) + Ok(Field::new("", DataType::List(inner_data_type), true).into()) } SQLDataType::Array(ArrayElemTypeDef::SquareBracket( inner_sql_type, @@ -608,19 +610,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { )) => { let inner_data_type = self.convert_data_type(inner_sql_type)?; if let Some(array_size) = maybe_array_size { - Ok(DataType::new_fixed_size_list( - inner_data_type, - *array_size as i32, + Ok(Field::new( + "", + DataType::FixedSizeList(inner_data_type, *array_size as i32), true, - )) + ) + .into()) } else { - Ok(DataType::new_list(inner_data_type, true)) + Ok(Field::new("", DataType::List(inner_data_type), true).into()) } } SQLDataType::Array(ArrayElemTypeDef::None) => { not_impl_err!("Arrays with unspecified type is not supported") } - other => self.convert_simple_data_type(other), + other => { + Ok(Field::new("", self.convert_simple_data_type(other)?, true).into()) + } } } @@ -739,11 +744,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { Some(ident) => ident.clone(), None => Ident::new(format!("c{idx}")), }; - Ok(Arc::new(Field::new( - self.ident_normalizer.normalize(field_name), - data_type, - true, - ))) + Ok(data_type.as_ref().clone().with_name(self.ident_normalizer.normalize(field_name))) }) .collect::>>()?; Ok(DataType::Struct(Fields::from(fields))) diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index d1d64091c8cc..87076950d91a 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -29,7 +29,7 @@ use crate::planner::{ }; use crate::utils::normalize_ident; -use arrow::datatypes::{DataType, Fields}; +use arrow::datatypes::{Field, FieldRef, Fields}; use datafusion_common::error::_plan_err; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ @@ -730,7 +730,7 @@ impl SqlToRel<'_, S> { statement, } => { // Convert parser data types to DataFusion data types - let mut data_types: Vec = data_types + let mut data_types: Vec = data_types .into_iter() .map(|t| self.convert_data_type(&t)) .collect::>()?; @@ -746,7 +746,7 @@ impl SqlToRel<'_, S> { )?; if data_types.is_empty() { - let map_types = plan.get_parameter_types()?; + let map_types = plan.get_parameter_fields()?; let param_types: Vec<_> = (1..=map_types.len()) .filter_map(|i| { let key = format!("${i}"); @@ -1203,7 +1203,7 @@ impl SqlToRel<'_, S> { Ok(OperateFunctionArg { name: arg.name, default_expr, - data_type, + data_type: data_type.data_type().clone(), }) }) .collect::>>(); @@ -1221,7 +1221,9 @@ impl SqlToRel<'_, S> { // Convert resulting expression to data fusion expression // let arg_types = args.as_ref().map(|arg| { - arg.iter().map(|t| t.data_type.clone()).collect::>() + arg.iter() + .map(|t| Arc::new(Field::new("", t.data_type.clone(), true))) + .collect::>() }); let mut planner_context = PlannerContext::new() .with_prepare_param_data_types(arg_types.unwrap_or_default()); @@ -1264,7 +1266,7 @@ impl SqlToRel<'_, S> { or_replace, temporary, name, - return_type, + return_type: return_type.map(|f| f.data_type().clone()), args, params, schema: DFSchemaRef::new(DFSchema::empty()), @@ -2105,8 +2107,7 @@ impl SqlToRel<'_, S> { idx + 1 ) })?; - let dt = field.data_type().clone(); - let _ = prepare_param_data_types.insert(name, dt); + let _ = prepare_param_data_types.insert(name, Arc::clone(field)); } } } From 83e37d63ebb3cdcf6dc276e9f20e526165579f18 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 9 Oct 2025 15:58:37 -0500 Subject: [PATCH 07/11] maybe fix build --- datafusion/proto/proto/datafusion.proto | 1 + datafusion/proto/src/generated/pbjson.rs | 17 ++++++++++++ datafusion/proto/src/generated/prost.rs | 2 ++ datafusion/proto/src/logical_plan/mod.rs | 35 ++++++++++++++++++++---- 4 files changed, 50 insertions(+), 5 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index b62eae739f40..1e6e4a0397c3 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -181,6 +181,7 @@ message PrepareNode { string name = 1; repeated datafusion_common.ArrowType data_types = 2; LogicalPlanNode input = 3; + repeated datafusion_common.Field fields = 4; } message CreateCatalogSchemaNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 1754a1e77784..f8cce7afc2ea 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -18834,6 +18834,9 @@ impl serde::Serialize for PrepareNode { if self.input.is_some() { len += 1; } + if !self.fields.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.PrepareNode", len)?; if !self.name.is_empty() { struct_ser.serialize_field("name", &self.name)?; @@ -18844,6 +18847,9 @@ impl serde::Serialize for PrepareNode { if let Some(v) = self.input.as_ref() { struct_ser.serialize_field("input", v)?; } + if !self.fields.is_empty() { + struct_ser.serialize_field("fields", &self.fields)?; + } struct_ser.end() } } @@ -18858,6 +18864,7 @@ impl<'de> serde::Deserialize<'de> for PrepareNode { "data_types", "dataTypes", "input", + "fields", ]; #[allow(clippy::enum_variant_names)] @@ -18865,6 +18872,7 @@ impl<'de> serde::Deserialize<'de> for PrepareNode { Name, DataTypes, Input, + Fields, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -18889,6 +18897,7 @@ impl<'de> serde::Deserialize<'de> for PrepareNode { "name" => Ok(GeneratedField::Name), "dataTypes" | "data_types" => Ok(GeneratedField::DataTypes), "input" => Ok(GeneratedField::Input), + "fields" => Ok(GeneratedField::Fields), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -18911,6 +18920,7 @@ impl<'de> serde::Deserialize<'de> for PrepareNode { let mut name__ = None; let mut data_types__ = None; let mut input__ = None; + let mut fields__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Name => { @@ -18931,12 +18941,19 @@ impl<'de> serde::Deserialize<'de> for PrepareNode { } input__ = map_.next_value()?; } + GeneratedField::Fields => { + if fields__.is_some() { + return Err(serde::de::Error::duplicate_field("fields")); + } + fields__ = Some(map_.next_value()?); + } } } Ok(PrepareNode { name: name__.unwrap_or_default(), data_types: data_types__.unwrap_or_default(), input: input__, + fields: fields__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 51ce9b4f8565..101b18f5ea5d 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -282,6 +282,8 @@ pub struct PrepareNode { pub data_types: ::prost::alloc::vec::Vec, #[prost(message, optional, boxed, tag = "3")] pub input: ::core::option::Option<::prost::alloc::boxed::Box>, + #[prost(message, repeated, tag = "4")] + pub fields: ::prost::alloc::vec::Vec, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct CreateCatalogSchemaNode { diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index fd9e07914b07..55e778b53155 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -33,7 +33,7 @@ use crate::{ }; use crate::protobuf::{proto_error, ToProtoError}; -use arrow::datatypes::{DataType, Schema, SchemaBuilder, SchemaRef}; +use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef}; use datafusion::datasource::cte_worktable::CteWorkTable; use datafusion::datasource::file_format::arrow::ArrowFormat; #[cfg(feature = "avro")] @@ -888,9 +888,30 @@ impl AsLogicalPlan for LogicalPlanNode { .iter() .map(DataType::try_from) .collect::>()?; - LogicalPlanBuilder::from(input) - .prepare(prepare.name.clone(), data_types)? - .build() + let fields: Vec = prepare + .fields + .iter() + .map(Field::try_from) + .collect::>()?; + + if fields.is_empty() { + LogicalPlanBuilder::from(input) + .prepare( + prepare.name.clone(), + data_types + .into_iter() + .map(|dt| Field::new("", dt, true).into()) + .collect(), + )? + .build() + } else { + LogicalPlanBuilder::from(input) + .prepare( + prepare.name.clone(), + fields.into_iter().map(|f| f.into()).collect(), + )? + .build() + } } LogicalPlanType::DropView(dropview) => { Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView { @@ -1632,9 +1653,13 @@ impl AsLogicalPlan for LogicalPlanNode { name: name.clone(), data_types: data_types .iter() - .map(|t| t.try_into()) + .map(|t| t.data_type().try_into()) .collect::, _>>()?, input: Some(Box::new(input)), + fields: data_types + .iter() + .map(|f| f.as_ref().try_into()) + .collect::, _>>()?, }, ))), }) From 97261c7911d3aa7b621015505830be827f95c0b7 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 9 Oct 2025 20:19:31 -0500 Subject: [PATCH 08/11] fix doc --- datafusion/common/src/metadata.rs | 6 +++--- datafusion/expr/src/expr.rs | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/common/src/metadata.rs b/datafusion/common/src/metadata.rs index 6f6f4e6adbe2..7026b49cb6fe 100644 --- a/datafusion/common/src/metadata.rs +++ b/datafusion/common/src/metadata.rs @@ -31,7 +31,7 @@ use hashbrown::HashMap; /// # Example: Create [`FieldMetadata`] from a [`Field`] /// ``` /// # use std::collections::HashMap; -/// # use datafusion_expr::expr::FieldMetadata; +/// # use datafusion_common::metadata::FieldMetadata; /// # use arrow::datatypes::{Field, DataType}; /// # let field = Field::new("c1", DataType::Int32, true) /// # .with_metadata(HashMap::from([("foo".to_string(), "bar".to_string())])); @@ -43,7 +43,7 @@ use hashbrown::HashMap; /// /// # Example: Update a [`Field`] with [`FieldMetadata`] /// ``` -/// # use datafusion_expr::expr::FieldMetadata; +/// # use datafusion_common::metadata::FieldMetadata; /// # use arrow::datatypes::{Field, DataType}; /// # let field = Field::new("c1", DataType::Int32, true); /// # let metadata = FieldMetadata::new_from_field(&field); @@ -96,7 +96,7 @@ impl FieldMetadata { /// /// # Example usage /// ```rust - /// use datafusion_expr::expr::FieldMetadata; + /// use datafusion_common::metadata::FieldMetadata; /// use std::collections::BTreeMap; /// /// // Create metadata for a literal expression diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 9ee058e1a08b..2bcdee2cc419 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -1623,7 +1623,7 @@ impl Expr { /// ``` /// # use datafusion_expr::col; /// # use std::collections::HashMap; - /// # use datafusion_expr::expr::FieldMetadata; + /// # use datafusion_common::metadata::FieldMetadata; /// let metadata = HashMap::from([("key".to_string(), "value".to_string())]); /// let metadata = FieldMetadata::from(metadata); /// let expr = col("foo").alias_with_metadata("bar", Some(metadata)); @@ -1655,7 +1655,7 @@ impl Expr { /// ``` /// # use datafusion_expr::col; /// # use std::collections::HashMap; - /// # use datafusion_expr::expr::FieldMetadata; + /// # use datafusion_common::metadata::FieldMetadata; /// let metadata = HashMap::from([("key".to_string(), "value".to_string())]); /// let metadata = FieldMetadata::from(metadata); /// let expr = col("foo").alias_qualified_with_metadata(Some("tbl"), "bar", Some(metadata)); From 43179671ca922335eb7cf4db98b3389ffbfdaace Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 9 Oct 2025 20:33:43 -0500 Subject: [PATCH 09/11] fix the name of the list --- datafusion/sql/src/planner.rs | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index a233e0f1fdb1..108e5ad52536 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -602,7 +602,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { SQLDataType::Array(ArrayElemTypeDef::AngleBracket(inner_sql_type)) => { // Arrays may be multi-dimensional. let inner_data_type = self.convert_data_type(inner_sql_type)?; - Ok(Field::new("", DataType::List(inner_data_type), true).into()) + Ok(Field::new( + "", + DataType::List( + inner_data_type.as_ref().clone().with_name("item").into(), + ), + true, + ) + .into()) } SQLDataType::Array(ArrayElemTypeDef::SquareBracket( inner_sql_type, @@ -612,12 +619,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { if let Some(array_size) = maybe_array_size { Ok(Field::new( "", - DataType::FixedSizeList(inner_data_type, *array_size as i32), + DataType::FixedSizeList( + inner_data_type.as_ref().clone().with_name("item").into(), + *array_size as i32, + ), true, ) .into()) } else { - Ok(Field::new("", DataType::List(inner_data_type), true).into()) + Ok(Field::new( + "", + DataType::List( + inner_data_type.as_ref().clone().with_name("item").into(), + ), + true, + ) + .into()) } } SQLDataType::Array(ArrayElemTypeDef::None) => { From 04cebe15504ce2ec78e208d0ca26c5d6d8ef5c43 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 9 Oct 2025 20:37:01 -0500 Subject: [PATCH 10/11] undo trailing whitespace change --- datafusion/expr/src/logical_plan/builder.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index e291349c7a66..a85de130e948 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -2709,12 +2709,12 @@ mod tests { assert_snapshot!(plan, @r" Union - Cross Join: + Cross Join: SubqueryAlias: left Values: (Int32(1)) SubqueryAlias: right Values: (Int32(1)) - Cross Join: + Cross Join: SubqueryAlias: left Values: (Int32(1)) SubqueryAlias: right From 382ba99409d8e6abfd8e25cefd1c52a527de0f0e Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 9 Oct 2025 21:12:26 -0500 Subject: [PATCH 11/11] maybe fix failing tests --- datafusion/sql/tests/cases/params.rs | 50 +++++++++---------- .../sqllogictest/test_files/prepare.slt | 2 +- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/datafusion/sql/tests/cases/params.rs b/datafusion/sql/tests/cases/params.rs index 343a90af3efb..555bc51a7bdc 100644 --- a/datafusion/sql/tests/cases/params.rs +++ b/datafusion/sql/tests/cases/params.rs @@ -155,13 +155,13 @@ fn test_prepare_statement_to_plan_no_param() { assert_snapshot!( plan, @r#" - Prepare: "my_plan" [Int32] + Prepare: "my_plan" [Field { name: "", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }] Projection: person.id, person.age Filter: person.age = Int64(10) TableScan: person "# ); - assert_snapshot!(dt, @r#"Int32"#); + assert_snapshot!(dt, @r#"Field { name: "", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"#); /////////////////// // replace params with values @@ -235,7 +235,7 @@ fn test_prepare_statement_to_plan_one_param_one_value_different_type_panic() { .unwrap_err() .strip_backtrace(), @r###" - Error during planning: Expected parameter of type Int32, got Float64 at index 0 + Error during planning: Expected parameter of type Field { name: "", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, got Float64 at index 0 "### ); } @@ -265,12 +265,12 @@ fn test_prepare_statement_to_plan_params_as_constants() { assert_snapshot!( plan, @r#" - Prepare: "my_plan" [Int32] + Prepare: "my_plan" [Field { name: "", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }] Projection: $1 EmptyRelation: rows=1 "# ); - assert_snapshot!(dt, @r#"Int32"#); + assert_snapshot!(dt, @r#"Field { name: "", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"#); /////////////////// // replace params with values @@ -290,12 +290,12 @@ fn test_prepare_statement_to_plan_params_as_constants() { assert_snapshot!( plan, @r#" - Prepare: "my_plan" [Int32] + Prepare: "my_plan" [Field { name: "", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }] Projection: Int64(1) + $1 EmptyRelation: rows=1 "# ); - assert_snapshot!(dt, @r#"Int32"#); + assert_snapshot!(dt, @r#"Field { name: "", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"#); /////////////////// // replace params with values @@ -315,12 +315,12 @@ fn test_prepare_statement_to_plan_params_as_constants() { assert_snapshot!( plan, @r#" - Prepare: "my_plan" [Int32, Float64] + Prepare: "my_plan" [Field { name: "", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }] Projection: Int64(1) + $1 + $2 EmptyRelation: rows=1 "# ); - assert_snapshot!(dt, @r#"Int32, Float64"#); + assert_snapshot!(dt, @r#"Field { name: "", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"#); /////////////////// // replace params with values @@ -376,7 +376,7 @@ fn test_prepare_statement_infer_types_from_join() { test.run(), @r#" ** Initial Plan: - Prepare: "my_plan" [Int32] + Prepare: "my_plan" [Field { name: "age", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] Projection: person.id, orders.order_id Inner Join: Filter: person.id = orders.customer_id AND person.age = $1 TableScan: person @@ -424,7 +424,7 @@ fn test_prepare_statement_infer_types_from_predicate() { test.run(), @r#" ** Initial Plan: - Prepare: "my_plan" [Int32] + Prepare: "my_plan" [Field { name: "age", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] Projection: person.id, person.age Filter: person.age = $1 TableScan: person @@ -476,7 +476,7 @@ fn test_prepare_statement_infer_types_from_between_predicate() { test.run(), @r#" ** Initial Plan: - Prepare: "my_plan" [Int32, Int32] + Prepare: "my_plan" [Field { name: "age", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "age", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] Projection: person.id, person.age Filter: person.age BETWEEN $1 AND $2 TableScan: person @@ -533,7 +533,7 @@ fn test_prepare_statement_infer_types_subquery() { test.run(), @r#" ** Initial Plan: - Prepare: "my_plan" [UInt32] + Prepare: "my_plan" [Field { name: "id", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] Projection: person.id, person.age Filter: person.age = () Subquery: @@ -598,7 +598,7 @@ fn test_prepare_statement_update_infer() { test.run(), @r#" ** Initial Plan: - Prepare: "my_plan" [Int32, UInt32] + Prepare: "my_plan" [Field { name: "age", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "id", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] Dml: op=[Update] table=[person] Projection: person.id AS id, person.first_name AS first_name, person.last_name AS last_name, $1 AS age, person.state AS state, person.salary AS salary, person.birth_date AS birth_date, person.😀 AS 😀 Filter: person.id = $2 @@ -662,7 +662,7 @@ fn test_prepare_statement_insert_infer() { test.run(), @r#" ** Initial Plan: - Prepare: "my_plan" [UInt32, Utf8, Utf8] + Prepare: "my_plan" [Field { name: "id", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "first_name", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "last_name", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] Dml: op=[Insert Into] table=[person] Projection: column1 AS id, column2 AS first_name, column3 AS last_name, CAST(NULL AS Int32) AS age, CAST(NULL AS Utf8) AS state, CAST(NULL AS Float64) AS salary, CAST(NULL AS Timestamp(Nanosecond, None)) AS birth_date, CAST(NULL AS Int32) AS 😀 Values: ($1, $2, $3) @@ -681,13 +681,13 @@ fn test_prepare_statement_to_plan_one_param() { assert_snapshot!( plan, @r#" - Prepare: "my_plan" [Int32] + Prepare: "my_plan" [Field { name: "", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }] Projection: person.id, person.age Filter: person.age = $1 TableScan: person "# ); - assert_snapshot!(dt, @r#"Int32"#); + assert_snapshot!(dt, @r#"Field { name: "", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"#); /////////////////// // replace params with values @@ -714,13 +714,13 @@ fn test_prepare_statement_to_plan_data_type() { // age is defined as Int32 but prepare statement declares it as DOUBLE/Float64 // Prepare statement and its logical plan should be created successfully @r#" - Prepare: "my_plan" [Float64] + Prepare: "my_plan" [Field { name: "", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }] Projection: person.id, person.age Filter: person.age = $1 TableScan: person "# ); - assert_snapshot!(dt, @r#"Float64"#); + assert_snapshot!(dt, @r#"Field { name: "", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"#); /////////////////// // replace params with values still succeed and use Float64 @@ -747,13 +747,13 @@ fn test_prepare_statement_to_plan_multi_params() { assert_snapshot!( plan, @r#" - Prepare: "my_plan" [Int32, Utf8View, Float64, Int32, Float64, Utf8View] + Prepare: "my_plan" [Field { name: "", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "", data_type: Utf8View, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "", data_type: Utf8View, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }] Projection: person.id, person.age, $6 Filter: person.age IN ([$1, $4]) AND person.salary > $3 AND person.salary < $5 OR person.first_name < $2 TableScan: person "# ); - assert_snapshot!(dt, @r#"Int32, Utf8View, Float64, Int32, Float64, Utf8View"#); + assert_snapshot!(dt, @r#"Field { name: "", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "", data_type: Utf8View, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "", data_type: Utf8View, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"#); /////////////////// // replace params with values @@ -790,7 +790,7 @@ fn test_prepare_statement_to_plan_having() { assert_snapshot!( plan, @r#" - Prepare: "my_plan" [Int32, Float64, Float64, Float64] + Prepare: "my_plan" [Field { name: "", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }] Projection: person.id, sum(person.age) Filter: sum(person.age) < $1 AND sum(person.age) > Int64(10) OR sum(person.age) IN ([$3, $4]) Aggregate: groupBy=[[person.id]], aggr=[[sum(person.age)]] @@ -798,7 +798,7 @@ fn test_prepare_statement_to_plan_having() { TableScan: person "# ); - assert_snapshot!(dt, @r#"Int32, Float64, Float64, Float64"#); + assert_snapshot!(dt, @r#"Field { name: "", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"#); /////////////////// // replace params with values @@ -831,13 +831,13 @@ fn test_prepare_statement_to_plan_limit() { assert_snapshot!( plan, @r#" - Prepare: "my_plan" [Int64, Int64] + Prepare: "my_plan" [Field { name: "", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }] Limit: skip=$1, fetch=$2 Projection: person.id TableScan: person "# ); - assert_snapshot!(dt, @r#"Int64, Int64"#); + assert_snapshot!(dt, @r#"Field { name: "", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"#); // replace params with values let param_values = vec![ScalarValue::Int64(Some(10)), ScalarValue::Int64(Some(200))]; diff --git a/datafusion/sqllogictest/test_files/prepare.slt b/datafusion/sqllogictest/test_files/prepare.slt index d61603ae6558..084e717dc2d7 100644 --- a/datafusion/sqllogictest/test_files/prepare.slt +++ b/datafusion/sqllogictest/test_files/prepare.slt @@ -289,7 +289,7 @@ query TT EXPLAIN PREPARE my_plan(INT) AS SELECT id + $1 FROM person; ---- logical_plan -01)Prepare: "my_plan" [Int32] +01)Prepare: "my_plan" [Field { name: "", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }] 02)--Projection: person.id + $1 03)----TableScan: person projection=[id]