From c68cce9af4b996327fd4bcfb21784b7b43a18875 Mon Sep 17 00:00:00 2001 From: theirix Date: Sat, 7 Jun 2025 18:19:11 +0100 Subject: [PATCH 1/8] Support table sampling Given SAMPLE and TABLESAMPLE parsed SQL, add a logical plan filter based on sample quantity. For example, `select COUNT(*) from data TABLESAMPLE SYSTEM 42` produces a filter `random() < 0.42`. --- datafusion/sql/Cargo.toml | 1 + datafusion/sql/src/select.rs | 144 +++++++++++++++++++++++++++++++++-- 2 files changed, 139 insertions(+), 6 deletions(-) diff --git a/datafusion/sql/Cargo.toml b/datafusion/sql/Cargo.toml index b778db46769d..e5ff251943f0 100644 --- a/datafusion/sql/Cargo.toml +++ b/datafusion/sql/Cargo.toml @@ -48,6 +48,7 @@ arrow = { workspace = true } bigdecimal = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } +datafusion-functions = { workspace = true, default-features = true } indexmap = { workspace = true } log = { workspace = true } recursive = { workspace = true, optional = true } diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 9fad274b51c0..403b7450b358 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -31,7 +31,9 @@ use datafusion_common::error::DataFusionErrorBuilder; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{not_impl_err, plan_err, Result}; use datafusion_common::{RecursionUnnestOption, UnnestOptions}; -use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions}; +use datafusion_expr::expr::{ + Alias, PlannedReplaceSelectItem, ScalarFunction, WildcardOptions, +}; use datafusion_expr::expr_rewriter::{ normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_sorts, }; @@ -40,14 +42,17 @@ use datafusion_expr::utils::{ expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_window_exprs, }; use datafusion_expr::{ - Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, - LogicalPlanBuilderOptions, Partitioning, + lit, Aggregate, BinaryExpr, Expr, Filter, GroupingSet, LogicalPlan, + LogicalPlanBuilder, LogicalPlanBuilderOptions, Operator, Partitioning, ScalarUDF, }; +use datafusion_functions::math::random::RandomFunc; use indexmap::IndexMap; use sqlparser::ast::{ visit_expressions_mut, Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, - OrderBy, SelectItemQualifiedWildcardKind, WildcardAdditionalOptions, WindowType, + OrderBy, SelectItemQualifiedWildcardKind, TableFactor, TableSampleKind, + TableSampleModifier, TableSampleQuantity, TableSampleUnit, WildcardAdditionalOptions, + WindowType, }; use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem, TableWithJoins}; @@ -77,11 +82,29 @@ impl SqlToRel<'_, S> { } // Process `from` clause - let plan = self.plan_from_tables(select.from, planner_context)?; + let plan = self.plan_from_tables(select.from.clone(), planner_context)?; let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_)); // Process `where` clause - let base_plan = self.plan_selection(select.selection, plan, planner_context)?; + let mut base_plan = + self.plan_selection(select.selection, plan, planner_context)?; + + // Now `base_plan` is a LogicalPlan::Filter + if let Some(from_first) = select.from.first() { + if let TableFactor::Table { + sample: Some(sample), + .. + } = &from_first.relation + { + // Rewrite SAMPLE / TABLESAMPLE clause to additional filters + // TODO: handle samples from different queries + base_plan = self.sample_to_where_random_clause( + base_plan, + sample, + planner_context, + )?; + } + } // Handle named windows before processing the projection expression check_conflicting_windows(&select.named_window)?; @@ -541,6 +564,115 @@ impl SqlToRel<'_, S> { } } + /// Extract an expression for table sample quantity + fn sample_quanitity_value( + &self, + quantity: &TableSampleQuantity, + planner_context: &mut PlannerContext, + ) -> Result { + match &quantity.value { + // Support only numeric literals now + SQLExpr::Value(value_with_span) => Ok(self.parse_value( + value_with_span.value.clone(), + planner_context.prepare_param_data_types(), + )?), + _ => not_impl_err!( + "Table quantity value {:?} is not supported", + &quantity.value + ), + } + } + + /// Compose expression for TABLE SAMPLE filter + fn sample_to_threshold_expr( + &self, + table_sample_kind: &TableSampleKind, + planner_context: &mut PlannerContext, + ) -> Result> { + // Support both before and after syntax + let table_sample = match table_sample_kind { + // Standard syntax + TableSampleKind::BeforeTableAlias(kind) => kind, + // Hive syntax + TableSampleKind::AfterTableAlias(kind) => kind, + }; + + // These features are not part of a common SQL specification, + // not implemented yet + if table_sample.seed.is_some() { + return not_impl_err!("Table sample seed is not supported"); + } + if table_sample.bucket.is_some() { + return not_impl_err!("Table sample bucket is not supported"); + } + if table_sample.offset.is_some() { + return not_impl_err!("Table sample offset is not supported"); + } + + if let Some(table_sample_quantity) = &table_sample.quantity { + match table_sample_quantity.unit { + Some(TableSampleUnit::Rows) => { + not_impl_err!("Table sample with rows unit is not supported") + } + Some(TableSampleUnit::Percent) | None => { + // Extract quantity from SQLExpr + let quantity_value: Expr = self + .sample_quanitity_value(table_sample_quantity, planner_context)?; + + let ratio: Expr = match table_sample.modifier { + TableSampleModifier::TableSample => + // SELECT * FROM tbl TABLESAMPLE SYSTEM (10), + // Value is percentage + { + Expr::BinaryExpr(BinaryExpr::new( + Box::new(quantity_value), + Operator::Divide, + Box::new(lit(100.0)), + )) + } + TableSampleModifier::Sample => + // SELECT * FROM tbl SAMPLE 0.1 + // Value is floating ratio, pass as is + { + quantity_value + } + }; + + let random_threshold = Box::new(ratio); + Ok(random_threshold) + } + } + } else { + plan_err!("Table sample quantity must be specified") + } + } + + /// Compose a logical plan with a static Filter based on TABLE SAMPLE expression + fn sample_to_where_random_clause( + &self, + plan: LogicalPlan, + sample_kind: &TableSampleKind, + planner_context: &mut PlannerContext, + ) -> Result { + // `random()` call + let random_udf = ScalarUDF::new_from_impl(RandomFunc::new()); + let random_expr_call = Box::new(Expr::ScalarFunction(ScalarFunction::new_udf( + Arc::new(random_udf), + vec![], + ))); + let random_threshold = + self.sample_to_threshold_expr(sample_kind, planner_context)?; + // New filter predicate: `random() < 0.1` + let predicate = Expr::BinaryExpr(BinaryExpr::new( + random_expr_call, + Operator::Lt, + random_threshold, + )); + let random_filter = Filter::try_new(predicate, Arc::new(plan)); + + Ok(LogicalPlan::Filter(random_filter?)) + } + pub(crate) fn plan_from_tables( &self, mut from: Vec, From dc1d3265aa512b9cb87e5010d20344171ca019c0 Mon Sep 17 00:00:00 2001 From: theirix Date: Sat, 7 Jun 2025 18:19:31 +0100 Subject: [PATCH 2/8] Add unit tests for SAMPLE and TABLESAMPLE --- datafusion/sql/tests/sql_integration.rs | 112 ++++++++++++++++++++++++ 1 file changed, 112 insertions(+) diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 4be7953aefc0..626d990f52ac 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -4714,3 +4714,115 @@ fn test_using_join_wildcard_schema() { ] ); } + +#[test] +fn select_tablesample_value() { + let sql = "SELECT count(*) + FROM person + TABLESAMPLE 42 + WHERE id > 5"; + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r#" + Projection: count(*) + Aggregate: groupBy=[[]], aggr=[[count(*)]] + Filter: random() < Int64(42) / Float64(100) + Filter: person.id > Int64(5) + TableScan: person + "# + ); +} + +#[test] +fn select_tablesample_value_float() { + let sql = "SELECT count(*) + FROM person + TABLESAMPLE 42.3 + WHERE id > 5"; + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r#" + Projection: count(*) + Aggregate: groupBy=[[]], aggr=[[count(*)]] + Filter: random() < Float64(42.3) / Float64(100) + Filter: person.id > Int64(5) + TableScan: person + "# + ); +} + +#[test] +fn select_tablesample_percent() { + let sql = "SELECT count(*) + FROM person + TABLESAMPLE SYSTEM (42 PERCENT) + WHERE id > 5"; + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r#" + Projection: count(*) + Aggregate: groupBy=[[]], aggr=[[count(*)]] + Filter: random() < Int64(42) / Float64(100) + Filter: person.id > Int64(5) + TableScan: person + "# + ); +} + +#[test] +fn select_sample() { + let sql = "SELECT count(*) + FROM person + SAMPLE 0.42 + WHERE id > 5"; + let plan = logical_plan(sql).unwrap(); + assert_snapshot!( + plan, + @r#" + Projection: count(*) + Aggregate: groupBy=[[]], aggr=[[count(*)]] + Filter: random() < Float64(0.42) + Filter: person.id > Int64(5) + TableScan: person + "# + ); +} + +#[test] +fn select_sample_rows_unsupported() { + let sql = "SELECT count(*) + FROM person + TABLESAMPLE (5 ROWS)"; + let err = logical_plan(sql); + assert_contains!( + err.unwrap_err().to_string(), + "Table sample with rows unit is not supported" + ); +} + +#[test] +fn select_sample_bucket_unsupported() { + let sql = "SELECT count(*) + FROM person + TABLESAMPLE (BUCKET 3 OUT OF 16 ON id)"; + let err = logical_plan(sql); + assert_contains!( + err.unwrap_err().to_string(), + "Table sample bucket is not supported" + ); +} + +#[test] +fn select_sample_seed_unsupported() { + let sql = "SELECT count(*) + FROM person + TABLESAMPLE SYSTEM (3) REPEATABLE (82)"; + let err = logical_plan(sql); + assert_contains!( + err.unwrap_err().to_string(), + "Table sample seed is not supported" + ); +} From af543503e5892598d6c016f5ef2986354695d6de Mon Sep 17 00:00:00 2001 From: theirix Date: Sat, 7 Jun 2025 18:19:48 +0100 Subject: [PATCH 3/8] Add regression sqllogictest test cases --- datafusion/sqllogictest/test_files/select.slt | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt index aa14faf984e4..4f88dcaaa15c 100644 --- a/datafusion/sqllogictest/test_files/select.slt +++ b/datafusion/sqllogictest/test_files/select.slt @@ -1869,5 +1869,48 @@ select *, count(*) over() as ta from t; 3 4 1 4 +# table sample random +query TT +EXPLAIN SELECT COUNT(*) from t TABLESAMPLE SYSTEM (42) WHERE a < 10; +---- +logical_plan +01)Projection: count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] +03)----Projection: +04)------Filter: t.a < Int32(10) AND random() < Float64(0.42) +05)--------TableScan: t projection=[a] +physical_plan +01)ProjectionExec: expr=[count(Int64(1))@0 as count(*)] +02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))] +03)----CoalescePartitionsExec +04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))] +05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +06)----------ProjectionExec: expr=[] +07)------------CoalesceBatchesExec: target_batch_size=8192 +08)--------------FilterExec: a@0 < 10 AND random() < 0.42 +09)----------------DataSourceExec: partitions=1, partition_sizes=[1] + +# sample random +query TT +EXPLAIN SELECT COUNT(*) from t SAMPLE 0.42 WHERE a < 10; +---- +logical_plan +01)Projection: count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] +03)----Projection: +04)------Filter: t.a < Int32(10) AND random() < Float64(0.42) +05)--------TableScan: t projection=[a] +physical_plan +01)ProjectionExec: expr=[count(Int64(1))@0 as count(*)] +02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))] +03)----CoalescePartitionsExec +04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))] +05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +06)----------ProjectionExec: expr=[] +07)------------CoalesceBatchesExec: target_batch_size=8192 +08)--------------FilterExec: a@0 < 10 AND random() < 0.42 +09)----------------DataSourceExec: partitions=1, partition_sizes=[1] + + statement count 0 drop table t; From 43e66f7ca0f8ed7aedd13e2418de4c78093cbf2a Mon Sep 17 00:00:00 2001 From: theirix Date: Thu, 12 Jun 2025 23:06:21 +0100 Subject: [PATCH 4/8] Move tablesample tests to a new file --- datafusion/sqllogictest/test_files/select.slt | 43 ----- .../sqllogictest/test_files/tablesample.slt | 149 ++++++++++++++++++ 2 files changed, 149 insertions(+), 43 deletions(-) create mode 100644 datafusion/sqllogictest/test_files/tablesample.slt diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt index 4f88dcaaa15c..aa14faf984e4 100644 --- a/datafusion/sqllogictest/test_files/select.slt +++ b/datafusion/sqllogictest/test_files/select.slt @@ -1869,48 +1869,5 @@ select *, count(*) over() as ta from t; 3 4 1 4 -# table sample random -query TT -EXPLAIN SELECT COUNT(*) from t TABLESAMPLE SYSTEM (42) WHERE a < 10; ----- -logical_plan -01)Projection: count(Int64(1)) AS count(*) -02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] -03)----Projection: -04)------Filter: t.a < Int32(10) AND random() < Float64(0.42) -05)--------TableScan: t projection=[a] -physical_plan -01)ProjectionExec: expr=[count(Int64(1))@0 as count(*)] -02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))] -03)----CoalescePartitionsExec -04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))] -05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -06)----------ProjectionExec: expr=[] -07)------------CoalesceBatchesExec: target_batch_size=8192 -08)--------------FilterExec: a@0 < 10 AND random() < 0.42 -09)----------------DataSourceExec: partitions=1, partition_sizes=[1] - -# sample random -query TT -EXPLAIN SELECT COUNT(*) from t SAMPLE 0.42 WHERE a < 10; ----- -logical_plan -01)Projection: count(Int64(1)) AS count(*) -02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] -03)----Projection: -04)------Filter: t.a < Int32(10) AND random() < Float64(0.42) -05)--------TableScan: t projection=[a] -physical_plan -01)ProjectionExec: expr=[count(Int64(1))@0 as count(*)] -02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))] -03)----CoalescePartitionsExec -04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))] -05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -06)----------ProjectionExec: expr=[] -07)------------CoalesceBatchesExec: target_batch_size=8192 -08)--------------FilterExec: a@0 < 10 AND random() < 0.42 -09)----------------DataSourceExec: partitions=1, partition_sizes=[1] - - statement count 0 drop table t; diff --git a/datafusion/sqllogictest/test_files/tablesample.slt b/datafusion/sqllogictest/test_files/tablesample.slt new file mode 100644 index 000000000000..f5603be840ff --- /dev/null +++ b/datafusion/sqllogictest/test_files/tablesample.slt @@ -0,0 +1,149 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +statement ok +create table t(a int) as values (1), (2), (3), (1); + +statement ok +create table t2(a int, b int) as values (1, 10), (2, 20), (3, 30), (4, 40); + +query II +select *, count(*) over() as ta from t; +---- +1 4 +2 4 +3 4 +1 4 + +statement ok +set datafusion.explain.logical_plan_only = true; + +# tablesample value +query TT +EXPLAIN SELECT COUNT(*) from t TABLESAMPLE 42 WHERE a < 10; +---- +logical_plan +01)Projection: count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] +03)----Projection: +04)------Filter: t.a < Int32(10) AND random() < Float64(0.42) +05)--------TableScan: t projection=[a] + + +# tablesample value float +query TT +EXPLAIN SELECT COUNT(*) from t TABLESAMPLE 42.3 WHERE a < 10; +---- +logical_plan +01)Projection: count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] +03)----Projection: +04)------Filter: t.a < Int32(10) AND random() < Float64(0.423) +05)--------TableScan: t projection=[a] + + +# tablesample system(value) +query TT +EXPLAIN SELECT COUNT(*) from t TABLESAMPLE SYSTEM (42) WHERE a < 10; +---- +logical_plan +01)Projection: count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] +03)----Projection: +04)------Filter: t.a < Int32(10) AND random() < Float64(0.42) +05)--------TableScan: t projection=[a] + +# tablesample system percent +query TT +EXPLAIN SELECT COUNT(*) from t TABLESAMPLE SYSTEM (42 PERCENT) WHERE a < 10; +---- +logical_plan +01)Projection: count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] +03)----Projection: +04)------Filter: t.a < Int32(10) AND random() < Float64(0.42) +05)--------TableScan: t projection=[a] + + +# sample random +query TT +EXPLAIN SELECT COUNT(*) from t SAMPLE 0.42 WHERE a < 10; +---- +logical_plan +01)Projection: count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] +03)----Projection: +04)------Filter: t.a < Int32(10) AND random() < Float64(0.42) +05)--------TableScan: t projection=[a] + + +query error DataFusion error: This feature is not implemented: Table sample with rows unit is not supported +EXPLAIN SELECT COUNT(*) from t TABLESAMPLE (5 ROWS); + + +query error DataFusion error: This feature is not implemented: Table sample bucket is not supported +EXPLAIN SELECT COUNT(*) from t TABLESAMPLE (BUCKET 3 OUT OF 16 ON id) + + +query error DataFusion error: This feature is not implemented: Table sample seed is not supported +EXPLAIN SELECT COUNT(*) from t TABLESAMPLE SYSTEM (3) REPEATABLE (82) + + +# smoke test +query III +SELECT t.a, t2.a, t2.b FROM t JOIN t2 on t.a = t2.a; +---- +1 1 10 +1 1 10 +2 2 20 +3 3 30 + +# multiple tables with join +# sampling is applied only to the first table +query TT +EXPLAIN SELECT COUNT(*) from t SAMPLE 0.42 JOIN t2 TABLESAMPLE 10 PERCENT on t.a = t2.a; +---- +logical_plan +01)Projection: count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] +03)----Projection: +04)------Inner Join: t.a = t2.a +05)--------Filter: random() < Float64(0.42) +06)----------TableScan: t projection=[a] +07)--------TableScan: t2 projection=[a] + +# multiple tables with subquery +query TT +EXPLAIN SELECT COUNT(*) from t SAMPLE 0.42 WHERE a IN (SELECT b from t2 TABLESAMPLE 10 PERCENT) and a < 10; +---- +logical_plan +01)Projection: count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] +03)----Projection: +04)------LeftSemi Join: t.a = __correlated_sq_1.b +05)--------Filter: t.a < Int32(10) AND random() < Float64(0.42) +06)----------TableScan: t projection=[a] +07)--------SubqueryAlias: __correlated_sq_1 +08)----------Filter: random() < Float64(0.1) +09)------------TableScan: t2 projection=[b] + +statement ok +set datafusion.explain.logical_plan_only = false; + +statement count 0 +drop table t; From 123f4bbfed56102e7b4861f7e459fc1d03a69795 Mon Sep 17 00:00:00 2001 From: theirix Date: Sat, 14 Jun 2025 10:36:13 +0100 Subject: [PATCH 5/8] Remove unit tests from sql_integration --- datafusion/sql/tests/sql_integration.rs | 112 ------------------------ 1 file changed, 112 deletions(-) diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 626d990f52ac..4be7953aefc0 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -4714,115 +4714,3 @@ fn test_using_join_wildcard_schema() { ] ); } - -#[test] -fn select_tablesample_value() { - let sql = "SELECT count(*) - FROM person - TABLESAMPLE 42 - WHERE id > 5"; - let plan = logical_plan(sql).unwrap(); - assert_snapshot!( - plan, - @r#" - Projection: count(*) - Aggregate: groupBy=[[]], aggr=[[count(*)]] - Filter: random() < Int64(42) / Float64(100) - Filter: person.id > Int64(5) - TableScan: person - "# - ); -} - -#[test] -fn select_tablesample_value_float() { - let sql = "SELECT count(*) - FROM person - TABLESAMPLE 42.3 - WHERE id > 5"; - let plan = logical_plan(sql).unwrap(); - assert_snapshot!( - plan, - @r#" - Projection: count(*) - Aggregate: groupBy=[[]], aggr=[[count(*)]] - Filter: random() < Float64(42.3) / Float64(100) - Filter: person.id > Int64(5) - TableScan: person - "# - ); -} - -#[test] -fn select_tablesample_percent() { - let sql = "SELECT count(*) - FROM person - TABLESAMPLE SYSTEM (42 PERCENT) - WHERE id > 5"; - let plan = logical_plan(sql).unwrap(); - assert_snapshot!( - plan, - @r#" - Projection: count(*) - Aggregate: groupBy=[[]], aggr=[[count(*)]] - Filter: random() < Int64(42) / Float64(100) - Filter: person.id > Int64(5) - TableScan: person - "# - ); -} - -#[test] -fn select_sample() { - let sql = "SELECT count(*) - FROM person - SAMPLE 0.42 - WHERE id > 5"; - let plan = logical_plan(sql).unwrap(); - assert_snapshot!( - plan, - @r#" - Projection: count(*) - Aggregate: groupBy=[[]], aggr=[[count(*)]] - Filter: random() < Float64(0.42) - Filter: person.id > Int64(5) - TableScan: person - "# - ); -} - -#[test] -fn select_sample_rows_unsupported() { - let sql = "SELECT count(*) - FROM person - TABLESAMPLE (5 ROWS)"; - let err = logical_plan(sql); - assert_contains!( - err.unwrap_err().to_string(), - "Table sample with rows unit is not supported" - ); -} - -#[test] -fn select_sample_bucket_unsupported() { - let sql = "SELECT count(*) - FROM person - TABLESAMPLE (BUCKET 3 OUT OF 16 ON id)"; - let err = logical_plan(sql); - assert_contains!( - err.unwrap_err().to_string(), - "Table sample bucket is not supported" - ); -} - -#[test] -fn select_sample_seed_unsupported() { - let sql = "SELECT count(*) - FROM person - TABLESAMPLE SYSTEM (3) REPEATABLE (82)"; - let err = logical_plan(sql); - assert_contains!( - err.unwrap_err().to_string(), - "Table sample seed is not supported" - ); -} From 9d5c68100f9bd99f4f169f62ea3f499e56a214a1 Mon Sep 17 00:00:00 2001 From: theirix Date: Sat, 14 Jun 2025 11:27:06 +0100 Subject: [PATCH 6/8] Add sampling explanations --- datafusion/sql/src/select.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 403b7450b358..f69bddfe921d 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -97,7 +97,7 @@ impl SqlToRel<'_, S> { } = &from_first.relation { // Rewrite SAMPLE / TABLESAMPLE clause to additional filters - // TODO: handle samples from different queries + // TODO: handle samples from joined tables base_plan = self.sample_to_where_random_clause( base_plan, sample, @@ -612,9 +612,16 @@ impl SqlToRel<'_, S> { if let Some(table_sample_quantity) = &table_sample.quantity { match table_sample_quantity.unit { Some(TableSampleUnit::Rows) => { + // Fixed size row sampling is not supported not_impl_err!("Table sample with rows unit is not supported") } Some(TableSampleUnit::Percent) | None => { + // There are two flavors of sampling (`TableSampleMethod`): + // - Block-level sampling (SYSTEM or BLOCK keywords) + // - Row-level sampling (BERNOULLI or ROW keywords) + // `random()` filter pushdown allows only block-level sampling, + // not row-level. However, we do not forbid using BERNOULLI/ROW; + // Extract quantity from SQLExpr let quantity_value: Expr = self .sample_quanitity_value(table_sample_quantity, planner_context)?; From e4cba7fc60359f0deb1aaa0928aea93327456fef Mon Sep 17 00:00:00 2001 From: theirix Date: Sat, 14 Jun 2025 12:52:39 +0100 Subject: [PATCH 7/8] More samples for tablesample coverage --- .../sqllogictest/test_files/tablesample.slt | 72 ++++++++++++++++++- 1 file changed, 69 insertions(+), 3 deletions(-) diff --git a/datafusion/sqllogictest/test_files/tablesample.slt b/datafusion/sqllogictest/test_files/tablesample.slt index f5603be840ff..8731360c2df8 100644 --- a/datafusion/sqllogictest/test_files/tablesample.slt +++ b/datafusion/sqllogictest/test_files/tablesample.slt @@ -79,6 +79,28 @@ logical_plan 04)------Filter: t.a < Int32(10) AND random() < Float64(0.42) 05)--------TableScan: t projection=[a] +# tablesample block(value) +query TT +EXPLAIN SELECT COUNT(*) from t TABLESAMPLE BLOCK (42) WHERE a < 10; +---- +logical_plan +01)Projection: count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] +03)----Projection: +04)------Filter: t.a < Int32(10) AND random() < Float64(0.42) +05)--------TableScan: t projection=[a] + +# tablesample after alias +query TT +EXPLAIN SELECT COUNT(*) from t as talias TABLESAMPLE SYSTEM (42) WHERE a < 10; +---- +logical_plan +01)Projection: count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] +03)----SubqueryAlias: talias +04)------Projection: +05)--------Filter: t.a < Int32(10) AND random() < Float64(0.42) +06)----------TableScan: t projection=[a] # sample random query TT @@ -91,20 +113,46 @@ logical_plan 04)------Filter: t.a < Int32(10) AND random() < Float64(0.42) 05)--------TableScan: t projection=[a] +# tablesample system percent with BERNOULLI method +query TT +EXPLAIN SELECT COUNT(*) from t TABLESAMPLE BERNOULLI (42 PERCENT) WHERE a < 10; +---- +logical_plan +01)Projection: count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] +03)----Projection: +04)------Filter: t.a < Int32(10) AND random() < Float64(0.42) +05)--------TableScan: t projection=[a] +# tablesample system percent with ROW method (percentage), Snowflake syntax +query TT +EXPLAIN SELECT COUNT(*) from t TABLESAMPLE ROW (42 PERCENT) WHERE a < 10; +---- +logical_plan +01)Projection: count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] +03)----Projection: +04)------Filter: t.a < Int32(10) AND random() < Float64(0.42) +05)--------TableScan: t projection=[a] + +# tablesample system percent with ROW method (rows), Snowflake syntax query error DataFusion error: This feature is not implemented: Table sample with rows unit is not supported -EXPLAIN SELECT COUNT(*) from t TABLESAMPLE (5 ROWS); +EXPLAIN SELECT COUNT(*) from t TABLESAMPLE ROW (20 ROWS) WHERE a < 10; +# unsupported: fixed row sampling +query error DataFusion error: This feature is not implemented: Table sample with rows unit is not supported +EXPLAIN SELECT COUNT(*) from t TABLESAMPLE (5 ROWS); +# unsupported: buckets query error DataFusion error: This feature is not implemented: Table sample bucket is not supported EXPLAIN SELECT COUNT(*) from t TABLESAMPLE (BUCKET 3 OUT OF 16 ON id) - +# unsupported: seed query error DataFusion error: This feature is not implemented: Table sample seed is not supported EXPLAIN SELECT COUNT(*) from t TABLESAMPLE SYSTEM (3) REPEATABLE (82) -# smoke test +# smoke test for joining tables query III SELECT t.a, t2.a, t2.b FROM t JOIN t2 on t.a = t2.a; ---- @@ -142,6 +190,24 @@ logical_plan 08)----------Filter: random() < Float64(0.1) 09)------------TableScan: t2 projection=[b] +statement ok +set datafusion.sql_parser.dialect = 'Hive'; + +# tablesample before alias, Hive syntax +query TT +EXPLAIN SELECT COUNT(*) from t TABLESAMPLE SYSTEM (42) as talias WHERE a < 10; +---- +logical_plan +01)Projection: count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] +03)----SubqueryAlias: talias +04)------Projection: +05)--------Filter: t.a < Int32(10) AND random() < Float64(0.42) +06)----------TableScan: t projection=[a] + +statement ok +set datafusion.sql_parser.dialect = 'Generic'; + statement ok set datafusion.explain.logical_plan_only = false; From a133380ff6a6c84d44d5a0d21c6569c95e5ba6c6 Mon Sep 17 00:00:00 2001 From: theirix Date: Wed, 30 Jul 2025 20:22:31 +0100 Subject: [PATCH 8/8] Verify that generated random filter is not pushed down --- .../sqllogictest/test_files/tablesample.slt | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/datafusion/sqllogictest/test_files/tablesample.slt b/datafusion/sqllogictest/test_files/tablesample.slt index 8731360c2df8..fe9d3acd8ecc 100644 --- a/datafusion/sqllogictest/test_files/tablesample.slt +++ b/datafusion/sqllogictest/test_files/tablesample.slt @@ -211,5 +211,29 @@ set datafusion.sql_parser.dialect = 'Generic'; statement ok set datafusion.explain.logical_plan_only = false; +# verify that `random()` filter is not pushed down to executor as volatile +statement ok +set datafusion.execution.parquet.pushdown_filters=true; + +query TT +EXPLAIN SELECT COUNT(*) from t TABLESAMPLE SYSTEM (42 PERCENT) WHERE a < 10; +---- +logical_plan +01)Projection: count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] +03)----Projection: +04)------Filter: t.a < Int32(10) AND random() < Float64(0.42) +05)--------TableScan: t projection=[a] +physical_plan +01)ProjectionExec: expr=[count(Int64(1))@0 as count(*)] +02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))] +03)----CoalescePartitionsExec +04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))] +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +06)----------ProjectionExec: expr=[] +07)------------CoalesceBatchesExec: target_batch_size=8192 +08)--------------FilterExec: a@0 < 10 AND random() < 0.42 +09)----------------DataSourceExec: partitions=1, partition_sizes=[1] + statement count 0 drop table t;