Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub mod file_options;
pub mod format;
pub mod hash_utils;
pub mod instant;
pub mod metadata;
pub mod nested_struct;
mod null_equality;
pub mod parsers;
Expand Down
250 changes: 250 additions & 0 deletions datafusion/common/src/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::{collections::BTreeMap, sync::Arc};

use arrow::datatypes::Field;
use hashbrown::HashMap;

/// Literal metadata
///
/// Stores metadata associated with a literal expressions
/// and is designed to be fast to `clone`.
///
/// This structure is used to store metadata associated with a literal expression, and it
/// corresponds to the `metadata` field on [`Field`].
///
/// # Example: Create [`FieldMetadata`] from a [`Field`]
/// ```
/// # use std::collections::HashMap;
/// # use datafusion_common::metadata::FieldMetadata;
/// # use arrow::datatypes::{Field, DataType};
/// # let field = Field::new("c1", DataType::Int32, true)
/// # .with_metadata(HashMap::from([("foo".to_string(), "bar".to_string())]));
/// // Create a new `FieldMetadata` instance from a `Field`
/// let metadata = FieldMetadata::new_from_field(&field);
/// // There is also a `From` impl:
/// let metadata = FieldMetadata::from(&field);
/// ```
///
/// # Example: Update a [`Field`] with [`FieldMetadata`]
/// ```
/// # use datafusion_common::metadata::FieldMetadata;
/// # use arrow::datatypes::{Field, DataType};
/// # let field = Field::new("c1", DataType::Int32, true);
/// # let metadata = FieldMetadata::new_from_field(&field);
/// // Add any metadata from `FieldMetadata` to `Field`
/// let updated_field = metadata.add_to_field(field);
/// ```
///
#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
pub struct FieldMetadata {
Comment on lines +54 to +55
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just moved this from the expr crate so I could us it in ParamValues

/// The inner metadata of a literal expression, which is a map of string
/// keys to string values.
///
/// Note this is not a `HashMap` because `HashMap` does not provide
/// implementations for traits like `Debug` and `Hash`.
inner: Arc<BTreeMap<String, String>>,
}

impl Default for FieldMetadata {
fn default() -> Self {
Self::new_empty()
}
}

impl FieldMetadata {
/// Create a new empty metadata instance.
pub fn new_empty() -> Self {
Self {
inner: Arc::new(BTreeMap::new()),
}
}

/// Merges two optional `FieldMetadata` instances, overwriting any existing
/// keys in `m` with keys from `n` if present.
///
/// This function is commonly used in alias operations, particularly for literals
/// with metadata. When creating an alias expression, the metadata from the original
/// expression (such as a literal) is combined with any metadata specified on the alias.
///
/// # Arguments
///
/// * `m` - The first metadata (typically from the original expression like a literal)
/// * `n` - The second metadata (typically from the alias definition)
///
/// # Merge Strategy
///
/// - If both metadata instances exist, they are merged with `n` taking precedence
/// - Keys from `n` will overwrite keys from `m` if they have the same name
/// - If only one metadata instance exists, it is returned unchanged
/// - If neither exists, `None` is returned
///
/// # Example usage
/// ```rust
/// use datafusion_common::metadata::FieldMetadata;
/// use std::collections::BTreeMap;
///
/// // Create metadata for a literal expression
/// let literal_metadata = Some(FieldMetadata::from(BTreeMap::from([
/// ("source".to_string(), "constant".to_string()),
/// ("type".to_string(), "int".to_string()),
/// ])));
///
/// // Create metadata for an alias
/// let alias_metadata = Some(FieldMetadata::from(BTreeMap::from([
/// ("description".to_string(), "answer".to_string()),
/// ("source".to_string(), "user".to_string()), // This will override literal's "source"
/// ])));
///
/// // Merge the metadata
/// let merged = FieldMetadata::merge_options(
/// literal_metadata.as_ref(),
/// alias_metadata.as_ref(),
/// );
///
/// // Result contains: {"source": "user", "type": "int", "description": "answer"}
/// assert!(merged.is_some());
/// ```
pub fn merge_options(
m: Option<&FieldMetadata>,
n: Option<&FieldMetadata>,
) -> Option<FieldMetadata> {
match (m, n) {
(Some(m), Some(n)) => {
let mut merged = m.clone();
merged.extend(n.clone());
Some(merged)
}
(Some(m), None) => Some(m.clone()),
(None, Some(n)) => Some(n.clone()),
(None, None) => None,
}
}

/// Create a new metadata instance from a `Field`'s metadata.
pub fn new_from_field(field: &Field) -> Self {
let inner = field
.metadata()
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
Self {
inner: Arc::new(inner),
}
}

/// Create a new metadata instance from a map of string keys to string values.
pub fn new(inner: BTreeMap<String, String>) -> Self {
Self {
inner: Arc::new(inner),
}
}

/// Get the inner metadata as a reference to a `BTreeMap`.
pub fn inner(&self) -> &BTreeMap<String, String> {
&self.inner
}

/// Return the inner metadata
pub fn into_inner(self) -> Arc<BTreeMap<String, String>> {
self.inner
}

/// Adds metadata from `other` into `self`, overwriting any existing keys.
pub fn extend(&mut self, other: Self) {
if other.is_empty() {
return;
}
let other = Arc::unwrap_or_clone(other.into_inner());
Arc::make_mut(&mut self.inner).extend(other);
}

/// Returns true if the metadata is empty.
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}

/// Returns the number of key-value pairs in the metadata.
pub fn len(&self) -> usize {
self.inner.len()
}

/// Convert this `FieldMetadata` into a `HashMap<String, String>`
pub fn to_hashmap(&self) -> std::collections::HashMap<String, String> {
self.inner
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect()
}

/// Updates the metadata on the Field with this metadata, if it is not empty.
pub fn add_to_field(&self, field: Field) -> Field {
if self.inner.is_empty() {
return field;
}

field.with_metadata(self.to_hashmap())
}
}

impl From<&Field> for FieldMetadata {
fn from(field: &Field) -> Self {
Self::new_from_field(field)
}
}

impl From<BTreeMap<String, String>> for FieldMetadata {
fn from(inner: BTreeMap<String, String>) -> Self {
Self::new(inner)
}
}

impl From<std::collections::HashMap<String, String>> for FieldMetadata {
fn from(map: std::collections::HashMap<String, String>) -> Self {
Self::new(map.into_iter().collect())
}
}

/// From reference
impl From<&std::collections::HashMap<String, String>> for FieldMetadata {
fn from(map: &std::collections::HashMap<String, String>) -> Self {
let inner = map
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
Self::new(inner)
}
}

/// From hashbrown map
impl From<HashMap<String, String>> for FieldMetadata {
fn from(map: HashMap<String, String>) -> Self {
let inner = map.into_iter().collect();
Self::new(inner)
}
}

impl From<&HashMap<String, String>> for FieldMetadata {
fn from(map: &HashMap<String, String>) -> Self {
let inner = map
.into_iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
Self::new(inner)
}
}
45 changes: 33 additions & 12 deletions datafusion/common/src/param_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,23 @@
// under the License.

use crate::error::{_plan_datafusion_err, _plan_err};
use crate::metadata::FieldMetadata;
use crate::{Result, ScalarValue};
use arrow::datatypes::DataType;
use arrow::datatypes::FieldRef;
use std::collections::HashMap;

/// The parameter value corresponding to the placeholder
#[derive(Debug, Clone)]
pub enum ParamValues {
/// For positional query parameters, like `SELECT * FROM test WHERE a > $1 AND b = $2`
List(Vec<ScalarValue>),
List(Vec<(ScalarValue, Option<FieldMetadata>)>),
/// For named query parameters, like `SELECT * FROM test WHERE a > $foo AND b = $goo`
Map(HashMap<String, ScalarValue>),
Map(HashMap<String, (ScalarValue, Option<FieldMetadata>)>),
}

impl ParamValues {
/// Verify parameter list length and type
pub fn verify(&self, expect: &[DataType]) -> Result<()> {
pub fn verify(&self, expect: &[FieldRef]) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one thing that would be nice to help people upgrade could be to add a new function and deprecate this one -- perhaps something like suggested in https://datafusion.apache.org/contributor-guide/api-health.html#api-health-policy

    #[deprecated]
    pub fn verify(&self, expect: &[DataType]) -> Result<()> {
      // make dummy Fields
      let expect = ...;
      self.verify_fields(&expect)
     }

    // new function that has the new signature
    pub fn verify_fields(&self, expect: &[FieldRef]) -> Result<()> {
    ...
    }

match self {
ParamValues::List(list) => {
// Verify if the number of params matches the number of values
Expand All @@ -45,15 +46,28 @@ impl ParamValues {

// Verify if the types of the params matches the types of the values
let iter = expect.iter().zip(list.iter());
for (i, (param_type, value)) in iter.enumerate() {
if *param_type != value.data_type() {
for (i, (param_type, (value, maybe_metadata))) in iter.enumerate() {
if *param_type.data_type() != value.data_type() {
return _plan_err!(
"Expected parameter of type {}, got {:?} at index {}",
param_type,
value.data_type(),
i
);
}

if let Some(expected_metadata) = maybe_metadata {
// Probably too strict of a comparison (this is an example of where
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree straight up comparing strings is probably not ideal

If we wanted to introduce type equality, I thing the bigger question is how to thread it through (you would have to have some way to register your types / methods to check equality and ensure that somehow ended up here 🤔 )

// the concept of type equality would be useful)
if &expected_metadata.to_hashmap() != param_type.metadata() {
return _plan_err!(
"Expected parameter with metadata {:?}, got {:?} at index {}",
expected_metadata,
param_type.metadata(),
i
);
}
}
}
Ok(())
}
Expand All @@ -65,7 +79,10 @@ impl ParamValues {
}
}

pub fn get_placeholders_with_values(&self, id: &str) -> Result<ScalarValue> {
pub fn get_placeholders_with_values(
&self,
id: &str,
) -> Result<(ScalarValue, Option<FieldMetadata>)> {
match self {
ParamValues::List(list) => {
if id.is_empty() {
Expand Down Expand Up @@ -99,7 +116,7 @@ impl ParamValues {

impl From<Vec<ScalarValue>> for ParamValues {
fn from(value: Vec<ScalarValue>) -> Self {
Self::List(value)
Self::List(value.into_iter().map(|v| (v, None)).collect())
}
}

Expand All @@ -108,8 +125,10 @@ where
K: Into<String>,
{
fn from(value: Vec<(K, ScalarValue)>) -> Self {
let value: HashMap<String, ScalarValue> =
value.into_iter().map(|(k, v)| (k.into(), v)).collect();
let value: HashMap<String, (ScalarValue, Option<FieldMetadata>)> = value
.into_iter()
.map(|(k, v)| (k.into(), (v, None)))
.collect();
Self::Map(value)
}
}
Expand All @@ -119,8 +138,10 @@ where
K: Into<String>,
{
fn from(value: HashMap<K, ScalarValue>) -> Self {
let value: HashMap<String, ScalarValue> =
value.into_iter().map(|(k, v)| (k.into(), v)).collect();
let value: HashMap<String, (ScalarValue, Option<FieldMetadata>)> = value
.into_iter()
.map(|(k, v)| (k.into(), (v, None)))
.collect();
Self::Map(value)
}
}
11 changes: 8 additions & 3 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ use datafusion_catalog::{
DynamicFileCatalog, TableFunction, TableFunctionImpl, UrlTableFactory,
};
use datafusion_common::config::ConfigOptions;
use datafusion_common::metadata::FieldMetadata;
use datafusion_common::{
config::{ConfigExtension, TableOptions},
exec_datafusion_err, exec_err, internal_datafusion_err, not_impl_err,
Expand Down Expand Up @@ -1238,10 +1239,10 @@ impl SessionContext {
})?;

// Only allow literals as parameters for now.
let mut params: Vec<ScalarValue> = parameters
let mut params: Vec<(ScalarValue, Option<FieldMetadata>)> = parameters
.into_iter()
.map(|e| match e {
Expr::Literal(scalar, _) => Ok(scalar),
Expr::Literal(scalar, metadata) => Ok((scalar, metadata)),
_ => not_impl_err!("Unsupported parameter type: {}", e),
})
.collect::<Result<_>>()?;
Expand All @@ -1259,7 +1260,11 @@ impl SessionContext {
params = params
.into_iter()
.zip(prepared.data_types.iter())
.map(|(e, dt)| e.cast_to(dt))
.map(|(e, dt)| -> Result<_> {
// This is fishy...we're casting storage without checking if an
// extension type supports the destination
Ok((e.0.cast_to(dt.data_type())?, e.1))
})
.collect::<Result<_>>()?;
}

Expand Down
Loading