diff --git a/rust/cubestore/Cargo.lock b/rust/cubestore/Cargo.lock index 28a2a62d93d2f..d717bc492b764 100644 --- a/rust/cubestore/Cargo.lock +++ b/rust/cubestore/Cargo.lock @@ -1227,6 +1227,7 @@ dependencies = [ "hex", "http-auth-basic", "humansize", + "indexmap 2.10.0", "indoc", "ipc-channel", "itertools 0.11.0", @@ -1973,7 +1974,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap 2.2.6", + "indexmap 2.10.0", "slab", "tokio", "tokio-util", @@ -1992,7 +1993,7 @@ dependencies = [ "futures-sink", "futures-util", "http 1.1.0", - "indexmap 2.2.6", + "indexmap 2.10.0", "slab", "tokio", "tokio-util", @@ -2020,6 +2021,12 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +[[package]] +name = "hashbrown" +version = "0.15.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5" + [[package]] name = "headers" version = "0.3.4" @@ -2302,12 +2309,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.6" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" +checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661" dependencies = [ "equivalent", - "hashbrown 0.14.3", + "hashbrown 0.15.4", ] [[package]] @@ -4641,7 +4648,7 @@ version = "1.0.117" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3" dependencies = [ - "indexmap 2.2.6", + "indexmap 2.10.0", "itoa 1.0.1", "ryu", "serde", diff --git a/rust/cubestore/cubestore/Cargo.toml b/rust/cubestore/cubestore/Cargo.toml index 3efdf1813914d..0c053f5d650bb 100644 --- a/rust/cubestore/cubestore/Cargo.toml +++ b/rust/cubestore/cubestore/Cargo.toml @@ -52,6 +52,7 @@ lazy_static = "1.4.0" mockall = "0.8.1" async-std = "0.99" async-stream = "0.3.6" +indexmap = "2.10.0" itertools = "0.11.0" bigdecimal = { version = "0.2.0", features = ["serde"] } # Right now, it's not possible to use the 0.33 release because it has bugs diff --git a/rust/cubestore/cubestore/src/queryplanner/projection_above_limit.rs b/rust/cubestore/cubestore/src/queryplanner/projection_above_limit.rs index 76f901d4722d5..99795f559571b 100644 --- a/rust/cubestore/cubestore/src/queryplanner/projection_above_limit.rs +++ b/rust/cubestore/cubestore/src/queryplanner/projection_above_limit.rs @@ -37,7 +37,7 @@ fn projection_above_limit(plan: &LogicalPlan) -> Result { LogicalPlan::Limit { n, input } => { let schema: &Arc = input.schema(); - let lift_up_result = lift_up_expensive_projections(input, HashSet::new()); + let lift_up_result = lift_up_expensive_projections(input, ColumnRecorder::default()); pal_debug!("lift_up_res: {:?}", lift_up_result); match lift_up_result { Ok((inner_plan, None)) => Ok(LogicalPlan::Limit { @@ -107,8 +107,11 @@ fn projection_above_limit(plan: &LogicalPlan) -> Result { } } +#[derive(Default)] struct ColumnRecorder { - columns: HashSet, + /// We use indexmap IndexSet because we want iteration order to be deterministic and + /// specifically, to match left-to-right insertion order. + columns: indexmap::IndexSet, } impl ExpressionVisitor for ColumnRecorder { @@ -180,21 +183,16 @@ fn looks_expensive(ex: &Expr) -> Result { fn lift_up_expensive_projections( plan: &LogicalPlan, - used_columns: HashSet, + used_columns: ColumnRecorder, ) -> Result<(LogicalPlan, Option>)> { match plan { LogicalPlan::Sort { expr, input } => { - let mut recorder = ColumnRecorder { - columns: used_columns, - }; + let mut recorder = used_columns; for ex in expr { recorder = ex.accept(recorder)?; } - let used_columns = recorder.columns; - - let (new_input, lifted_projection) = - lift_up_expensive_projections(&input, used_columns)?; + let (new_input, lifted_projection) = lift_up_expensive_projections(&input, recorder)?; pal_debug!( "Sort sees result:\n{:?};;;{:?};;;", new_input, @@ -213,9 +211,7 @@ fn lift_up_expensive_projections( input, schema, } => { - let mut column_recorder = ColumnRecorder { - columns: HashSet::new(), - }; + let mut column_recorder = ColumnRecorder::default(); let mut this_projection_exprs = Vec::::new(); @@ -241,7 +237,7 @@ fn lift_up_expensive_projections( already_retained_cols.push((col.clone(), Some(alias.clone()))); } - if used_columns.contains(&field.qualified_column()) { + if used_columns.columns.contains(&field.qualified_column()) { pal_debug!( "Expr {}: used_columns contains field {:?}", i, @@ -510,6 +506,44 @@ mod tests { Ok(()) } + /// Tests that multiple columns are retained in a deterministic order (and as a nice-to-have, + /// they should be in the left-to-right order of appearance). + #[test] + fn limit_sorted_plan_with_expensive_expr_retaining_multiple_columns() -> Result<()> { + let table_scan = test_table_scan_abcd()?; + + let case_expr = when(col("d").eq(lit(3)), col("c") + lit(2)).otherwise(lit(5))?; + + let plan = LogicalPlanBuilder::from(table_scan) + .project([ + col("a").alias("a1"), + col("b").alias("b1"), + case_expr.alias("c1"), + ])? + .sort([col("a1").sort(true, true)])? + .limit(50)? + .build()?; + + let expected = "Limit: 50\ + \n Sort: #a1 ASC NULLS FIRST\ + \n Projection: #test.a AS a1, #test.b AS b1, CASE WHEN #test.d Eq Int32(3) THEN #test.c Plus Int32(2) ELSE Int32(5) END AS c1\ + \n TableScan: test projection=None"; + + let formatted = format!("{:?}", plan); + assert_eq!(formatted, expected); + + // We are testing that test.d deterministically comes before test.c in the inner Projection. + let optimized_expected = "Projection: #a1, #b1, CASE WHEN #test.d Eq Int32(3) THEN #test.c Plus Int32(2) ELSE Int32(5) END AS c1\ + \n Limit: 50\ + \n Sort: #a1 ASC NULLS FIRST\ + \n Projection: #test.a AS a1, #test.b AS b1, #test.d, #test.c\ + \n TableScan: test projection=None"; + + assert_optimized_plan_eq(&plan, optimized_expected); + + Ok(()) + } + /// Tests that we re-alias fields in the lifted up projection. #[test] fn limit_sorted_plan_with_nonaliased_expensive_expr_optimized() -> Result<()> { @@ -659,4 +693,15 @@ mod tests { pub fn test_table_scan() -> Result { test_table_scan_with_name("test") } + + pub fn test_table_scan_abcd() -> Result { + let name = "test"; + let schema = Schema::new(vec![ + Field::new("a", DataType::UInt32, false), + Field::new("b", DataType::UInt32, false), + Field::new("c", DataType::UInt32, false), + Field::new("d", DataType::UInt32, false), + ]); + LogicalPlanBuilder::scan_empty(Some(name), &schema, None)?.build() + } }