Skip to content

Commit a970b91

Browse files
committed
test: Add hash_agg_aggregation_strategy_with_nongrouped_single_value_columns_in_sort_key test
1 parent 4cf1a82 commit a970b91

File tree

1 file changed

+68
-1
lines changed

1 file changed

+68
-1
lines changed

datafusion/src/physical_plan/planner.rs

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1758,7 +1758,8 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
17581758
#[cfg(test)]
17591759
mod tests {
17601760
use super::*;
1761-
use crate::logical_plan::{DFField, DFSchema, DFSchemaRef};
1761+
use crate::logical_plan::{and, DFField, DFSchema, DFSchemaRef};
1762+
use crate::physical_plan::OptimizerHints;
17621763
use crate::physical_plan::{csv::CsvReadOptions, expressions, Partitioning};
17631764
use crate::scalar::ScalarValue;
17641765
use crate::{
@@ -2041,6 +2042,72 @@ mod tests {
20412042
Ok(())
20422043
}
20432044

2045+
#[test]
2046+
fn hash_agg_aggregation_strategy_with_nongrouped_single_value_columns_in_sort_key(
2047+
) -> Result<()> {
2048+
let testdata = crate::test_util::arrow_test_data();
2049+
let path = format!("{}/csv/aggregate_test_100.csv", testdata);
2050+
2051+
let options = CsvReadOptions::new().schema_infer_max_records(100);
2052+
2053+
fn sort(column_name: &str) -> Expr {
2054+
col(column_name).sort(true, true)
2055+
}
2056+
2057+
// Instead of creating a mock ExecutionPlan, we have some input plan which produces the desired output_hints().
2058+
let logical_plan = LogicalPlanBuilder::scan_csv(path, options, None)?
2059+
.filter(and(
2060+
col("c4").eq(lit("value_a")),
2061+
col("c8").eq(lit("value_b")),
2062+
))?
2063+
.sort(vec![
2064+
sort("c1"),
2065+
sort("c2"),
2066+
sort("c3"),
2067+
sort("c4"),
2068+
sort("c5"),
2069+
sort("c6"),
2070+
sort("c7"),
2071+
sort("c8"),
2072+
])?
2073+
.build()?;
2074+
2075+
let execution_plan = plan(&logical_plan)?;
2076+
2077+
// Note that both single_value_columns are part of the sort key... but one will not be part of the group key.
2078+
let hints: OptimizerHints = execution_plan.output_hints();
2079+
assert_eq!(hints.sort_order, Some(vec![0, 1, 2, 3, 4, 5, 6, 7]));
2080+
assert_eq!(hints.single_value_columns, vec![3, 7]);
2081+
2082+
// Now make a group_key that overlaps one single_value_column, but the single value column 7
2083+
// has column 5 and 6 ("c6" and "c7" respectively) in between.
2084+
let group_key = vec![col("c1"), col("c2"), col("c3"), col("c4"), col("c5")];
2085+
let mut ctx_state = make_ctx_state();
2086+
ctx_state.config.concurrency = 4;
2087+
let planner = DefaultPhysicalPlanner::default();
2088+
let mut physical_group_key = Vec::new();
2089+
for expr in group_key {
2090+
let phys_expr = planner.create_physical_expr(
2091+
&expr,
2092+
&logical_plan.schema(),
2093+
&execution_plan.schema(),
2094+
&ctx_state,
2095+
)?;
2096+
physical_group_key.push((phys_expr, "".to_owned()));
2097+
}
2098+
2099+
let mut sort_order = Vec::<usize>::new();
2100+
let is_sorted: bool = input_sorted_by_group_key(
2101+
execution_plan.as_ref(),
2102+
&physical_group_key,
2103+
&mut sort_order,
2104+
);
2105+
assert!(is_sorted);
2106+
assert_eq!(sort_order, vec![0, 1, 2, 3, 4]);
2107+
2108+
Ok(())
2109+
}
2110+
20442111
#[test]
20452112
fn test_explain() {
20462113
let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);

0 commit comments

Comments
 (0)