Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1417,9 +1417,7 @@ mod tests {
fn from_qualified_schema_into_arrow_schema() -> Result<()> {
let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
let arrow_schema = schema.as_arrow();
let expected = "Field { name: \"c0\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, \
Field { name: \"c1\", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }";
assert_eq!(expected, arrow_schema.to_string());
insta::assert_snapshot!(arrow_schema.to_string(), @r#"Field { name: "c0", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c1", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"#);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by changing this to assert_snapshot it is easier to update in the arrow-57 upgrade, where this message changes

Ok(())
}

Expand Down
45 changes: 9 additions & 36 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2486,7 +2486,7 @@ mod tests {
// the cast here is implicit so has CastOptions with safe=true
let expected = r#"BinaryExpr { left: Column { name: "c7", index: 2 }, op: Lt, right: Literal { value: Int64(5), field: Field { name: "lit", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, fail_on_overflow: false }"#;

assert!(format!("{exec_plan:?}").contains(expected));
assert_contains!(format!("{exec_plan:?}"), expected);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likewise this makes it easier to update the test as now the actual output is also printed

Ok(())
}

Expand All @@ -2510,9 +2510,7 @@ mod tests {
&session_state,
);

let expected = r#"Ok(PhysicalGroupBy { expr: [(Column { name: "c1", index: 0 }, "c1"), (Column { name: "c2", index: 1 }, "c2"), (Column { name: "c3", index: 2 }, "c3")], null_expr: [(Literal { value: Utf8(NULL), field: Field { name: "lit", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c1"), (Literal { value: Int64(NULL), field: Field { name: "lit", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c2"), (Literal { value: Int64(NULL), field: Field { name: "lit", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c3")], groups: [[false, false, false], [true, false, false], [false, true, false], [false, false, true], [true, true, false], [true, false, true], [false, true, true], [true, true, true]] })"#;

assert_eq!(format!("{cube:?}"), expected);
insta::assert_snapshot!(format!("{cube:?}"), @r#"Ok(PhysicalGroupBy { expr: [(Column { name: "c1", index: 0 }, "c1"), (Column { name: "c2", index: 1 }, "c2"), (Column { name: "c3", index: 2 }, "c3")], null_expr: [(Literal { value: Utf8(NULL), field: Field { name: "lit", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c1"), (Literal { value: Int64(NULL), field: Field { name: "lit", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c2"), (Literal { value: Int64(NULL), field: Field { name: "lit", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c3")], groups: [[false, false, false], [true, false, false], [false, true, false], [false, false, true], [true, true, false], [true, false, true], [false, true, true], [true, true, true]] })"#);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, you can also use assert_debug_snapshot!(cube, ...) although it would not be compact


Ok(())
}
Expand All @@ -2537,9 +2535,7 @@ mod tests {
&session_state,
);

let expected = r#"Ok(PhysicalGroupBy { expr: [(Column { name: "c1", index: 0 }, "c1"), (Column { name: "c2", index: 1 }, "c2"), (Column { name: "c3", index: 2 }, "c3")], null_expr: [(Literal { value: Utf8(NULL), field: Field { name: "lit", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c1"), (Literal { value: Int64(NULL), field: Field { name: "lit", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c2"), (Literal { value: Int64(NULL), field: Field { name: "lit", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c3")], groups: [[true, true, true], [false, true, true], [false, false, true], [false, false, false]] })"#;

assert_eq!(format!("{rollup:?}"), expected);
insta::assert_snapshot!(format!("{rollup:?}"), @r#"Ok(PhysicalGroupBy { expr: [(Column { name: "c1", index: 0 }, "c1"), (Column { name: "c2", index: 1 }, "c2"), (Column { name: "c3", index: 2 }, "c3")], null_expr: [(Literal { value: Utf8(NULL), field: Field { name: "lit", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c1"), (Literal { value: Int64(NULL), field: Field { name: "lit", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c2"), (Literal { value: Int64(NULL), field: Field { name: "lit", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} } }, "c3")], groups: [[true, true, true], [false, true, true], [false, false, true], [false, false, false]] })"#);

Ok(())
}
Expand Down Expand Up @@ -2677,35 +2673,13 @@ mod tests {
let logical_plan = LogicalPlan::Extension(Extension {
node: Arc::new(NoOpExtensionNode::default()),
});
let plan = planner
let e = planner
.create_physical_plan(&logical_plan, &session_state)
.await;
.await
.expect_err("planning error")
.strip_backtrace();

let expected_error: &str = "Error during planning: \
Extension planner for NoOp created an ExecutionPlan with mismatched schema. \
LogicalPlan schema: \
DFSchema { inner: Schema { fields: \
[Field { name: \"a\", \
data_type: Int32, \
nullable: false, \
dict_id: 0, \
dict_is_ordered: false, metadata: {} }], \
metadata: {} }, field_qualifiers: [None], \
functional_dependencies: FunctionalDependencies { deps: [] } }, \
ExecutionPlan schema: Schema { fields: \
[Field { name: \"b\", \
data_type: Int32, \
nullable: false, \
dict_id: 0, \
dict_is_ordered: false, metadata: {} }], \
metadata: {} }";
match plan {
Ok(_) => panic!("Expected planning failure"),
Err(e) => assert!(
e.to_string().contains(expected_error),
"Error '{e}' did not contain expected error '{expected_error}'"
),
}
insta::assert_snapshot!(e, @r#"Error during planning: Extension planner for NoOp created an ExecutionPlan with mismatched schema. LogicalPlan schema: DFSchema { inner: Schema { fields: [Field { name: "a", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [None], functional_dependencies: FunctionalDependencies { deps: [] } }, ExecutionPlan schema: Schema { fields: [Field { name: "b", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }"#);
}

#[tokio::test]
Expand All @@ -2723,8 +2697,7 @@ mod tests {

let expected = "expr: [ProjectionExpr { expr: BinaryExpr { left: BinaryExpr { left: Column { name: \"c1\", index: 0 }, op: Eq, right: Literal { value: Utf8(\"a\"), field: Field { name: \"lit\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, fail_on_overflow: false }, op: Or, right: BinaryExpr { left: Column { name: \"c1\", index: 0 }, op: Eq, right: Literal { value: Utf8(\"1\"), field: Field { name: \"lit\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, fail_on_overflow: false }, fail_on_overflow: false }";

let actual = format!("{execution_plan:?}");
assert!(actual.contains(expected), "{}", actual);
assert_contains!(format!("{execution_plan:?}"), expected);

Ok(())
}
Expand Down
10 changes: 6 additions & 4 deletions datafusion/core/tests/sql/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,12 @@ async fn test_parameter_invalid_types() -> Result<()> {
.with_param_values(vec![ScalarValue::from(4_i32)])?
.collect()
.await;
assert_eq!(
results.unwrap_err().strip_backtrace(),
"type_coercion\ncaused by\nError during planning: Cannot infer common argument type for comparison operation List(Field { name: \"item\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) = Int32"
);
assert_snapshot!(results.unwrap_err().strip_backtrace(),
@r#"
type_coercion
caused by
Error during planning: Cannot infer common argument type for comparison operation List(Field { name: "item", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) = Int32
"#);
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ fn test_date_timestamp_arithmetic_error() -> Result<()> {
&DataType::Timestamp(TimeUnit::Millisecond, None),
)
.get_input_types()?;
assert_eq!(lhs.to_string(), "Timestamp(Millisecond, None)");
assert_eq!(rhs.to_string(), "Timestamp(Millisecond, None)");
assert_eq!(lhs, DataType::Timestamp(TimeUnit::Millisecond, None));
assert_eq!(rhs, DataType::Timestamp(TimeUnit::Millisecond, None));

let err =
BinaryTypeCoercer::new(&DataType::Date32, &Operator::Plus, &DataType::Date64)
Expand Down
9 changes: 3 additions & 6 deletions datafusion/physical-expr/src/expressions/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ mod tests {
},
datatypes::*,
};
use datafusion_common::assert_contains;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use insta::assert_snapshot;

// runs an end-to-end test of physical type cast
// 1. construct a record batch with a column "a" of type A
Expand Down Expand Up @@ -438,11 +438,8 @@ mod tests {
)?;
let expression =
cast_with_options(col("a", &schema)?, &schema, Decimal128(6, 2), None)?;
let e = expression.evaluate(&batch).unwrap_err(); // panics on OK
assert_contains!(
e.to_string(),
"Arrow error: Invalid argument error: 12345679 is too large to store in a Decimal128 of precision 6. Max is 999999"
);
let e = expression.evaluate(&batch).unwrap_err().strip_backtrace(); // panics on OK
assert_snapshot!(e, @"Arrow error: Invalid argument error: 12345679 is too large to store in a Decimal128 of precision 6. Max is 999999");

let expression_safe = cast_with_options(
col("a", &schema)?,
Expand Down
42 changes: 31 additions & 11 deletions datafusion/physical-expr/src/expressions/in_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,13 +476,13 @@ pub fn in_list(

#[cfg(test)]
mod tests {

use super::*;
use crate::expressions;
use crate::expressions::{col, lit, try_cast};
use datafusion_common::plan_err;
use datafusion_expr::type_coercion::binary::comparison_coercion;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use insta::assert_snapshot;
use itertools::Itertools as _;

type InListCastResult = (Arc<dyn PhysicalExpr>, Vec<Arc<dyn PhysicalExpr>>);
Expand Down Expand Up @@ -1456,7 +1456,7 @@ mod tests {
}

#[test]
fn test_fmt_sql() -> Result<()> {
fn test_fmt_sql_1() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
let col_a = col("a", &schema)?;

Expand All @@ -1465,33 +1465,53 @@ mod tests {
let expr = in_list(Arc::clone(&col_a), list, &false, &schema)?;
let sql_string = fmt_sql(expr.as_ref()).to_string();
let display_string = expr.to_string();
assert_eq!(sql_string, "a IN (a, b)");
assert_eq!(display_string, "a@0 IN (SET) ([a, b])");
assert_snapshot!(sql_string, @"a IN (a, b)");
assert_snapshot!(display_string, @"a@0 IN (SET) ([a, b])");
Ok(())
}

#[test]
fn test_fmt_sql_2() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
let col_a = col("a", &schema)?;

// Test: a NOT IN ('a', 'b')
let list = vec![lit("a"), lit("b")];
let expr = in_list(Arc::clone(&col_a), list, &true, &schema)?;
let sql_string = fmt_sql(expr.as_ref()).to_string();
let display_string = expr.to_string();
assert_eq!(sql_string, "a NOT IN (a, b)");
assert_eq!(display_string, "a@0 NOT IN (SET) ([a, b])");

assert_snapshot!(sql_string, @"a NOT IN (a, b)");
assert_snapshot!(display_string, @"a@0 NOT IN (SET) ([a, b])");
Ok(())
}

#[test]
fn test_fmt_sql_3() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
let col_a = col("a", &schema)?;
// Test: a IN ('a', 'b', NULL)
let list = vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))];
let expr = in_list(Arc::clone(&col_a), list, &false, &schema)?;
let sql_string = fmt_sql(expr.as_ref()).to_string();
let display_string = expr.to_string();
assert_eq!(sql_string, "a IN (a, b, NULL)");
assert_eq!(display_string, "a@0 IN (SET) ([a, b, NULL])");

assert_snapshot!(sql_string, @"a IN (a, b, NULL)");
assert_snapshot!(display_string, @"a@0 IN (SET) ([a, b, NULL])");
Ok(())
}

#[test]
fn test_fmt_sql_4() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
let col_a = col("a", &schema)?;
// Test: a NOT IN ('a', 'b', NULL)
let list = vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))];
let expr = in_list(Arc::clone(&col_a), list, &true, &schema)?;
let sql_string = fmt_sql(expr.as_ref()).to_string();
let display_string = expr.to_string();
assert_eq!(sql_string, "a NOT IN (a, b, NULL)");
assert_eq!(display_string, "a@0 NOT IN (SET) ([a, b, NULL])");

assert_snapshot!(sql_string, @"a NOT IN (a, b, NULL)");
assert_snapshot!(display_string, @"a@0 NOT IN (SET) ([a, b, NULL])");
Ok(())
}
}
33 changes: 12 additions & 21 deletions datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1250,7 +1250,7 @@ mod tests {
use crate::windows::{
create_udwf_window_expr, create_window_expr, BoundedWindowAggExec, InputOrderMode,
};
use crate::{execute_stream, get_plan_string, ExecutionPlan};
use crate::{displayable, execute_stream, ExecutionPlan};

use arrow::array::{
builder::{Int64Builder, UInt64Builder},
Expand Down Expand Up @@ -1695,16 +1695,12 @@ mod tests {

let batches = collect(physical_plan.execute(0, task_ctx)?).await?;

let expected = vec![
"BoundedWindowAggExec: wdw=[last: Field { name: \"last\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, nth_value(-1): Field { name: \"nth_value(-1)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, nth_value(-2): Field { name: \"nth_value(-2)\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]",
" DataSourceExec: partitions=1, partition_sizes=[3]",
];
// Get string representation of the plan
let actual = get_plan_string(&physical_plan);
assert_eq!(
expected, actual,
"\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);
let formatted = displayable(physical_plan.as_ref()).indent(true).to_string();
assert_snapshot!(formatted, @r#"
BoundedWindowAggExec: wdw=[last: Field { name: "last", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, nth_value(-1): Field { name: "nth_value(-1)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, nth_value(-2): Field { name: "nth_value(-2)", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
DataSourceExec: partitions=1, partition_sizes=[3]
"#);

assert_snapshot!(batches_to_string(&batches), @r#"
+---+------+---------------+---------------+
Expand Down Expand Up @@ -1817,18 +1813,13 @@ mod tests {

let plan = projection_exec(window)?;

let expected_plan = vec![
"ProjectionExec: expr=[sn@0 as sn, hash@1 as hash, count([Column { name: \"sn\", index: 0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }]]@2 as col_2]",
" BoundedWindowAggExec: wdw=[count([Column { name: \"sn\", index: 0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }]]: Field { name: \"count([Column { name: \\\"sn\\\", index: 0 }]) PARTITION BY: [[Column { name: \\\"hash\\\", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: \\\"sn\\\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }]]\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING], mode=[Linear]",
" StreamingTableExec: partition_sizes=1, projection=[sn, hash], infinite_source=true, output_ordering=[sn@0 ASC NULLS LAST]",
];

// Get string representation of the plan
let actual = get_plan_string(&plan);
assert_eq!(
expected_plan, actual,
"\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected_plan:#?}\nactual:\n\n{actual:#?}\n\n"
);
let actual = displayable(plan.as_ref()).indent(true).to_string();
assert_snapshot!(actual, @r#"
ProjectionExec: expr=[sn@0 as sn, hash@1 as hash, count([Column { name: "sn", index: 0 }]) PARTITION BY: [[Column { name: "hash", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: "sn", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }]]@2 as col_2]
BoundedWindowAggExec: wdw=[count([Column { name: "sn", index: 0 }]) PARTITION BY: [[Column { name: "hash", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: "sn", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }]]: Field { name: "count([Column { name: \"sn\", index: 0 }]) PARTITION BY: [[Column { name: \"hash\", index: 1 }]], ORDER BY: [[PhysicalSortExpr { expr: Column { name: \"sn\", index: 0 }, options: SortOptions { descending: false, nulls_first: true } }]]", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING], mode=[Linear]
StreamingTableExec: partition_sizes=1, projection=[sn, hash], infinite_source=true, output_ordering=[sn@0 ASC NULLS LAST]
"#);

let task_ctx = task_context();
let batches = collect_with_timeout(plan, task_ctx, timeout_duration).await?;
Expand Down