Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 1 addition & 21 deletions datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,26 +327,6 @@ impl Column {
}
}

impl From<&str> for Column {
fn from(c: &str) -> Self {
Self::from_qualified_name(c)
}
}

/// Create a column, cloning the string
impl From<&String> for Column {
fn from(c: &String) -> Self {
Self::from_qualified_name(c)
}
}

/// Create a column, reusing the existing string
impl From<String> for Column {
fn from(c: String) -> Self {
Self::from_qualified_name(c)
}
}

/// Create a column, use qualifier and field name
impl From<(Option<&TableReference>, &Field)> for Column {
fn from((relation, field): (Option<&TableReference>, &Field)) -> Self {
Expand All @@ -366,7 +346,7 @@ impl std::str::FromStr for Column {
type Err = std::convert::Infallible;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(s.into())
Ok(Self::from_qualified_name(s))
}
}

Expand Down
13 changes: 8 additions & 5 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ impl DataFrame {
columns: &[&str],
options: UnnestOptions,
) -> Result<DataFrame> {
let columns = columns.iter().map(|c| Column::from(*c)).collect();
let columns = columns.iter().map(|c| Column::from_qualified_name(*c)).collect();
let plan = LogicalPlanBuilder::from(self.plan)
.unnest_columns_with_options(columns, options)?
.build()?;
Expand Down Expand Up @@ -1257,7 +1257,10 @@ impl DataFrame {
.join(
right.plan,
join_type,
(left_cols.to_vec(), right_cols.to_vec()),
(
left_cols.iter().map(|c| Column::from_qualified_name(*c)).collect(),
right_cols.iter().map(|c| Column::from_qualified_name(*c)).collect(),
),
filter,
)?
.build()?;
Expand Down Expand Up @@ -2167,7 +2170,7 @@ impl DataFrame {
col_exists = true;
Some((new_column.clone(), true))
} else {
let e = col(Column::from((qualifier, field)));
let e = Expr::Column(Column::from((qualifier, field)));
Some((e, self.projection_requires_validation))
}
})
Expand Down Expand Up @@ -2245,12 +2248,12 @@ impl DataFrame {
.map(|(qualifier, field)| {
if qualifier.eq(&qualifier_rename) && field == field_rename {
(
col(Column::from((qualifier, field)))
Expr::Column(Column::from((qualifier, field)))
.alias_qualified(qualifier.cloned(), new_name),
false,
)
} else {
(col(Column::from((qualifier, field))), false)
(Expr::Column(Column::from((qualifier, field))), false)
}
})
.collect::<Vec<_>>();
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ mod tests {
ListingOptions, ListingTable, ListingTableConfig, SchemaSource,
};
use datafusion_common::{
DataFusionError, Result, ScalarValue, assert_contains,
Column, DataFusionError, Result, ScalarValue, assert_contains,
stats::Precision,
test_util::{batches_to_string, datafusion_test_data},
};
Expand Down Expand Up @@ -776,7 +776,7 @@ mod tests {
)]));

let filter_predicate = Expr::BinaryExpr(BinaryExpr::new(
Box::new(Expr::Column("column1".into())),
Box::new(Expr::Column(Column::from_qualified_name("column1"))),
Operator::GtEq,
Box::new(Expr::Literal(ScalarValue::Int32(Some(0)), None)),
));
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1852,7 +1852,7 @@ impl Expr {
/// # use datafusion_common::Column;
/// use datafusion_expr::{col, Expr};
/// let expr = col("foo");
/// assert_eq!(expr.try_as_col(), Some(&Column::from("foo")));
/// assert_eq!(expr.try_as_col(), Some(&Column::from_qualified_name("foo")));
///
/// let expr = col("foo").alias("bar");
/// assert_eq!(expr.try_as_col(), None);
Expand Down
10 changes: 5 additions & 5 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,25 @@ use std::sync::Arc;
/// let c3 = col(r#""A""#);
/// assert_ne!(c1, c3);
/// ```
pub fn col(ident: impl Into<Column>) -> Expr {
Expr::Column(ident.into())
pub fn col(ident: impl Into<String>) -> Expr {
Expr::Column(Column::from_qualified_name(ident))
}

/// Create an out reference column which hold a reference that has been resolved to a field
/// outside of the current plan.
/// The expression created by this function does not preserve the metadata of the outer column.
/// Please use `out_ref_col_with_metadata` if you want to preserve the metadata.
pub fn out_ref_col(dt: DataType, ident: impl Into<Column>) -> Expr {
pub fn out_ref_col(dt: DataType, ident: impl Into<String>) -> Expr {
out_ref_col_with_metadata(dt, HashMap::new(), ident)
}

/// Create an out reference column from an existing field (preserving metadata)
pub fn out_ref_col_with_metadata(
dt: DataType,
metadata: HashMap<String, String>,
ident: impl Into<Column>,
ident: impl Into<String>,
) -> Expr {
let column = ident.into();
let column = Column::from_qualified_name(ident);
let field: FieldRef =
Arc::new(Field::new(column.name(), dt, true).with_metadata(metadata));
Expr::OuterReferenceColumn(field, column)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ mod tests {
let outer_ref = out_ref_col_with_metadata(
DataType::Int32,
meta.to_hashmap(),
Column::from_name("foo"),
"foo",
);
assert_eq!(meta, outer_ref.metadata(&schema).unwrap());
}
Expand Down
20 changes: 10 additions & 10 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1500,19 +1500,19 @@ impl LogicalPlanBuilder {
}

/// Unnest the given column.
pub fn unnest_column(self, column: impl Into<Column>) -> Result<Self> {
unnest(Arc::unwrap_or_clone(self.plan), vec![column.into()]).map(Self::new)
pub fn unnest_column(self, column: impl Into<String>) -> Result<Self> {
unnest(Arc::unwrap_or_clone(self.plan), vec![Column::from_qualified_name(column)]).map(Self::new)
}

/// Unnest the given column given [`UnnestOptions`]
pub fn unnest_column_with_options(
self,
column: impl Into<Column>,
column: impl Into<String>,
options: UnnestOptions,
) -> Result<Self> {
unnest_with_options(
Arc::unwrap_or_clone(self.plan),
vec![column.into()],
vec![Column::from_qualified_name(column)],
options,
)
.map(Self::new)
Expand Down Expand Up @@ -2664,7 +2664,7 @@ mod tests {
// Unnesting multiple fields at the same time, using infer syntax
let cols = vec!["strings", "structs", "struct_singular"]
.into_iter()
.map(|c| c.into())
.map(Column::from_qualified_name)
.collect();

let plan = nested_table_scan("test_table")?
Expand All @@ -2683,16 +2683,16 @@ mod tests {
// Simultaneously unnesting a list (with different depth) and a struct column
let plan = nested_table_scan("test_table")?
.unnest_columns_with_options(
vec!["stringss".into(), "struct_singular".into()],
vec![Column::from_qualified_name("stringss"), Column::from_qualified_name("struct_singular")],
UnnestOptions::default()
.with_recursions(RecursionUnnestOption {
input_column: "stringss".into(),
output_column: "stringss_depth_1".into(),
input_column: Column::from_qualified_name("stringss"),
output_column: Column::from_qualified_name("stringss_depth_1"),
depth: 1,
})
.with_recursions(RecursionUnnestOption {
input_column: "stringss".into(),
output_column: "stringss_depth_2".into(),
input_column: Column::from_qualified_name("stringss"),
output_column: Column::from_qualified_name("stringss_depth_2"),
depth: 2,
}),
)?
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4989,7 +4989,7 @@ mod tests {
let col = schema.field_names()[0].clone();

let filter = Filter::try_new(
Expr::Column(col.into()).eq(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
Expr::Column(Column::from_qualified_name(col)).eq(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
scan,
)
.unwrap();
Expand Down Expand Up @@ -5019,7 +5019,7 @@ mod tests {
let col = schema.field_names()[0].clone();

let filter =
Filter::try_new(Expr::Column(col.into()).eq(lit(1i32)), scan).unwrap();
Filter::try_new(Expr::Column(Column::from_qualified_name(col)).eq(lit(1i32)), scan).unwrap();
assert!(filter.is_scalar());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ fn grouping_function_on_id(
}
};

let grouping_id_column = Expr::Column(Column::from(Aggregate::INTERNAL_GROUPING_ID));
let grouping_id_column = Expr::Column(Column::from_qualified_name(Aggregate::INTERNAL_GROUPING_ID));
// The grouping call is exactly our internal grouping id
if args.len() == group_by_expr_count
&& args
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,7 @@ mod test {
let table_scan_1 = test_table_scan_with_name("test1").unwrap();
let table_scan_2 = test_table_scan_with_name("test2").unwrap();
let join = LogicalPlanBuilder::from(table_scan_1)
.join(table_scan_2, JoinType::Inner, (vec!["a"], vec!["a"]), None)
.join(table_scan_2, JoinType::Inner, (vec![Column::from_qualified_name("a")], vec![Column::from_qualified_name("a")]), None)
.unwrap()
.build()
.unwrap();
Expand Down
5 changes: 3 additions & 2 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ mod tests {
use crate::optimizer::OptimizerContext;
use crate::test::*;

use datafusion_common::Column;
use datafusion_expr::{
Operator::{And, Or},
binary_expr, col, lit,
Expand Down Expand Up @@ -654,10 +655,10 @@ mod tests {
.join(
t3,
JoinType::Inner,
(vec!["t1.a"], vec!["t3.a"]),
(vec![Column::from_qualified_name("t1.a")], vec![Column::from_qualified_name("t3.a")]),
Some(col("t1.a").gt(lit(20u32))),
)?
.join(t2, JoinType::Inner, (vec!["t1.a"], vec!["t2.a"]), None)?
.join(t2, JoinType::Inner, (vec![Column::from_qualified_name("t1.a")], vec![Column::from_qualified_name("t2.a")]), None)?
.filter(col("t1.a").gt(lit(15u32)))?
.build()?;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/filter_null_join_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ mod tests {
.join(
t2,
JoinType::Inner,
(vec!["optional_id"], vec!["t2.optional_id"]),
(vec![Column::from_qualified_name("optional_id")], vec![Column::from_qualified_name("t2.optional_id")]),
None,
)?
.build()?;
Expand Down
10 changes: 5 additions & 5 deletions datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1734,9 +1734,9 @@ mod tests {
let plan = table_scan(Some("m4"), &schema, None)?
.aggregate(
Vec::<Expr>::new(),
vec![max(col(Column::new_unqualified("tag.one"))).alias("tag.one")],
vec![max(Expr::Column(Column::new_unqualified("tag.one"))).alias("tag.one")],
)?
.project([col(Column::new_unqualified("tag.one"))])?
.project([Expr::Column(Column::new_unqualified("tag.one"))])?
.build()?;

assert_optimized_plan_equal!(
Expand Down Expand Up @@ -1842,7 +1842,7 @@ mod tests {
let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;

let plan = LogicalPlanBuilder::from(table_scan)
.join(table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]), None)?
.join(table2_scan, JoinType::Left, (vec![Column::from_qualified_name("a")], vec![Column::from_qualified_name("c1")]), None)?
.project(vec![col("a"), col("b"), col("c1")])?
.build()?;

Expand Down Expand Up @@ -1894,7 +1894,7 @@ mod tests {
let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;

let plan = LogicalPlanBuilder::from(table_scan)
.join(table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]), None)?
.join(table2_scan, JoinType::Left, (vec![Column::from_qualified_name("a")], vec![Column::from_qualified_name("c1")]), None)?
// projecting joined column `a` should push the right side column `c1` projection as
// well into test2 table even though `c1` is not referenced in projection.
.project(vec![col("a"), col("b")])?
Expand Down Expand Up @@ -1949,7 +1949,7 @@ mod tests {
let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;

let plan = LogicalPlanBuilder::from(table_scan)
.join_using(table2_scan, JoinType::Left, vec!["a".into()])?
.join_using(table2_scan, JoinType::Left, vec![Column::from_qualified_name("a")])?
.project(vec![col("a"), col("b")])?
.build()?;

Expand Down
12 changes: 6 additions & 6 deletions datafusion/optimizer/src/push_down_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ mod test {
use crate::test::*;

use crate::OptimizerContext;
use datafusion_common::DFSchemaRef;
use datafusion_common::{Column, DFSchemaRef};
use datafusion_expr::{
Expr, Extension, UserDefinedLogicalNodeCore, col, exists,
logical_plan::builder::LogicalPlanBuilder,
Expand Down Expand Up @@ -837,7 +837,7 @@ mod test {
.join(
LogicalPlanBuilder::from(table_scan_2).build()?,
JoinType::Inner,
(vec!["a"], vec!["a"]),
(vec![Column::from_qualified_name("a")], vec![Column::from_qualified_name("a")]),
None,
)?
.limit(10, Some(1000))?
Expand All @@ -864,7 +864,7 @@ mod test {
.join(
LogicalPlanBuilder::from(table_scan_2).build()?,
JoinType::Inner,
(vec!["a"], vec!["a"]),
(vec![Column::from_qualified_name("a")], vec![Column::from_qualified_name("a")]),
None,
)?
.limit(10, Some(1000))?
Expand Down Expand Up @@ -955,7 +955,7 @@ mod test {
.join(
LogicalPlanBuilder::from(table_scan_2).build()?,
JoinType::Left,
(vec!["a"], vec!["a"]),
(vec![Column::from_qualified_name("a")], vec![Column::from_qualified_name("a")]),
None,
)?
.limit(10, Some(1000))?
Expand Down Expand Up @@ -983,7 +983,7 @@ mod test {
.join(
LogicalPlanBuilder::from(table_scan_2).build()?,
JoinType::Right,
(vec!["a"], vec!["a"]),
(vec![Column::from_qualified_name("a")], vec![Column::from_qualified_name("a")]),
None,
)?
.limit(0, Some(1000))?
Expand Down Expand Up @@ -1011,7 +1011,7 @@ mod test {
.join(
LogicalPlanBuilder::from(table_scan_2).build()?,
JoinType::Right,
(vec!["a"], vec!["a"]),
(vec![Column::from_qualified_name("a")], vec![Column::from_qualified_name("a")]),
None,
)?
.limit(10, Some(1000))?
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/replace_distinct_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion_common::{Column, Result};
use datafusion_expr::expr_rewriter::normalize_cols;
use datafusion_expr::utils::expand_wildcard;
use datafusion_expr::{Aggregate, Distinct, DistinctOn, Expr, LogicalPlan};
use datafusion_expr::{ExprFunctionExt, Limit, LogicalPlanBuilder, col, lit};
use datafusion_expr::{ExprFunctionExt, Limit, LogicalPlanBuilder, lit};

/// Optimizer that replaces logical [[Distinct]] with a logical [[Aggregate]]
///
Expand Down Expand Up @@ -179,7 +179,7 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
.skip(expr_cnt)
.zip(schema.iter())
.map(|((new_qualifier, new_field), (old_qualifier, old_field))| {
col(Column::from((new_qualifier, new_field)))
Expr::Column(Column::from((new_qualifier, new_field)))
.alias_qualified(old_qualifier.cloned(), old_field.name())
})
.collect::<Vec<Expr>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ mod tests {

// Test that it still extracts from direct column references
let col_expr = col("a");
assert_eq!(extract_column_from_expr(&col_expr), Some(Column::from("a")));
assert_eq!(extract_column_from_expr(&col_expr), Some(Column::from_qualified_name("a")));
}

#[test]
Expand Down
Loading
Loading