Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ arrow = { workspace = true }
bigdecimal = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-expr = { workspace = true }
datafusion-functions = { workspace = true, default-features = true }
indexmap = { workspace = true }
log = { workspace = true }
recursive = { workspace = true, optional = true }
Expand Down
144 changes: 138 additions & 6 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use datafusion_common::error::DataFusionErrorBuilder;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{not_impl_err, plan_err, Result};
use datafusion_common::{RecursionUnnestOption, UnnestOptions};
use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions};
use datafusion_expr::expr::{
Alias, PlannedReplaceSelectItem, ScalarFunction, WildcardOptions,
};
use datafusion_expr::expr_rewriter::{
normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_sorts,
};
Expand All @@ -40,14 +42,17 @@ use datafusion_expr::utils::{
expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_window_exprs,
};
use datafusion_expr::{
Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder,
LogicalPlanBuilderOptions, Partitioning,
lit, Aggregate, BinaryExpr, Expr, Filter, GroupingSet, LogicalPlan,
LogicalPlanBuilder, LogicalPlanBuilderOptions, Operator, Partitioning, ScalarUDF,
};
use datafusion_functions::math::random::RandomFunc;

use indexmap::IndexMap;
use sqlparser::ast::{
visit_expressions_mut, Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr,
OrderBy, SelectItemQualifiedWildcardKind, WildcardAdditionalOptions, WindowType,
OrderBy, SelectItemQualifiedWildcardKind, TableFactor, TableSampleKind,
TableSampleModifier, TableSampleQuantity, TableSampleUnit, WildcardAdditionalOptions,
WindowType,
};
use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem, TableWithJoins};

Expand Down Expand Up @@ -77,11 +82,29 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}

// Process `from` clause
let plan = self.plan_from_tables(select.from, planner_context)?;
let plan = self.plan_from_tables(select.from.clone(), planner_context)?;
let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));

// Process `where` clause
let base_plan = self.plan_selection(select.selection, plan, planner_context)?;
let mut base_plan =
Copy link
Contributor

@2010YOUY01 2010YOUY01 Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we'd better do this rewrite in a separate logical optimizer rule, to keep the planning code clean. It can be done with a follow-up PR before adding more functionality to scan sampling.
(Unless there's a specific reason to do this during the planning phase — I did notice some rewrites happening during planning, but I'm not sure why.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"logical optimizer rule" mainly focuses on Optimization, I think it's fair to rewrite during planning phase.

self.plan_selection(select.selection, plan, planner_context)?;

// Now `base_plan` is a LogicalPlan::Filter
if let Some(from_first) = select.from.first() {
if let TableFactor::Table {
sample: Some(sample),
..
} = &from_first.relation
{
// Rewrite SAMPLE / TABLESAMPLE clause to additional filters
// TODO: handle samples from different queries
base_plan = self.sample_to_where_random_clause(
base_plan,
sample,
planner_context,
)?;
}
}

// Handle named windows before processing the projection expression
check_conflicting_windows(&select.named_window)?;
Expand Down Expand Up @@ -541,6 +564,115 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}
}

/// Extract an expression for table sample quantity
fn sample_quanitity_value(
&self,
quantity: &TableSampleQuantity,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
match &quantity.value {
// Support only numeric literals now
SQLExpr::Value(value_with_span) => Ok(self.parse_value(
value_with_span.value.clone(),
planner_context.prepare_param_data_types(),
)?),
_ => not_impl_err!(
"Table quantity value {:?} is not supported",
&quantity.value
),
}
}

/// Compose expression for TABLE SAMPLE filter
fn sample_to_threshold_expr(
&self,
table_sample_kind: &TableSampleKind,
planner_context: &mut PlannerContext,
) -> Result<Box<Expr>> {
// Support both before and after syntax
let table_sample = match table_sample_kind {
// Standard syntax
TableSampleKind::BeforeTableAlias(kind) => kind,
// Hive syntax
TableSampleKind::AfterTableAlias(kind) => kind,
};

// These features are not part of a common SQL specification,
// not implemented yet
if table_sample.seed.is_some() {
return not_impl_err!("Table sample seed is not supported");
}
if table_sample.bucket.is_some() {
return not_impl_err!("Table sample bucket is not supported");
}
if table_sample.offset.is_some() {
return not_impl_err!("Table sample offset is not supported");
}

if let Some(table_sample_quantity) = &table_sample.quantity {
match table_sample_quantity.unit {
Some(TableSampleUnit::Rows) => {
not_impl_err!("Table sample with rows unit is not supported")
}
Some(TableSampleUnit::Percent) | None => {
// Extract quantity from SQLExpr
let quantity_value: Expr = self
.sample_quanitity_value(table_sample_quantity, planner_context)?;

let ratio: Expr = match table_sample.modifier {
TableSampleModifier::TableSample =>
// SELECT * FROM tbl TABLESAMPLE SYSTEM (10),
// Value is percentage
{
Expr::BinaryExpr(BinaryExpr::new(
Box::new(quantity_value),
Operator::Divide,
Box::new(lit(100.0)),
))
}
TableSampleModifier::Sample =>
// SELECT * FROM tbl SAMPLE 0.1
// Value is floating ratio, pass as is
{
quantity_value
}
};

let random_threshold = Box::new(ratio);
Ok(random_threshold)
}
}
} else {
plan_err!("Table sample quantity must be specified")
}
}

/// Compose a logical plan with a static Filter based on TABLE SAMPLE expression
fn sample_to_where_random_clause(
&self,
plan: LogicalPlan,
sample_kind: &TableSampleKind,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
// `random()` call
let random_udf = ScalarUDF::new_from_impl(RandomFunc::new());
let random_expr_call = Box::new(Expr::ScalarFunction(ScalarFunction::new_udf(
Arc::new(random_udf),
vec![],
)));
let random_threshold =
self.sample_to_threshold_expr(sample_kind, planner_context)?;
// New filter predicate: `random() < 0.1`
let predicate = Expr::BinaryExpr(BinaryExpr::new(
random_expr_call,
Operator::Lt,
random_threshold,
));
let random_filter = Filter::try_new(predicate, Arc::new(plan));

Ok(LogicalPlan::Filter(random_filter?))
}

pub(crate) fn plan_from_tables(
&self,
mut from: Vec<TableWithJoins>,
Expand Down
112 changes: 112 additions & 0 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4714,3 +4714,115 @@ fn test_using_join_wildcard_schema() {
]
);
}

#[test]
Copy link
Contributor

@2010YOUY01 2010YOUY01 Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding test structure, I suggest:

  1. Move all of the sql_integration tests to sqllogictest, since .slts are easier to maintain.
    To only show logical plans, you can use
set datafusion.explain.logical_plan_only = true;

# sqllogictest tests

# cleanup
set datafusion.explain.logical_plan_only = false;
  1. Create a separate .slt file for all tests related to TABLESAMPLE

To improve test coverage, I recommend to add the following test cases

  1. Select from multiple table, and test only some of table with sample / all of the tables have sample.
  2. Test all sample methods in
    https://github.com/apache/datafusion-sqlparser-rs/blob/84c3a1b325c39c879b68ab712e3b9b3e3e40ed56/src/ast/query.rs#L1475
    and expect error for unimplemented ones.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reworked tests in slt files, covered extra cases.

I have included explanations in the code regarding the handling of system vs row sampling.

The remaining part is handling joins (subqueries work fine, verified with integration tests). I put a TODO about it to address in a future PR.

fn select_tablesample_value() {
let sql = "SELECT count(*)
FROM person
TABLESAMPLE 42
WHERE id > 5";
let plan = logical_plan(sql).unwrap();
assert_snapshot!(
plan,
@r#"
Projection: count(*)
Aggregate: groupBy=[[]], aggr=[[count(*)]]
Filter: random() < Int64(42) / Float64(100)
Filter: person.id > Int64(5)
TableScan: person
"#
);
}

#[test]
fn select_tablesample_value_float() {
let sql = "SELECT count(*)
FROM person
TABLESAMPLE 42.3
WHERE id > 5";
let plan = logical_plan(sql).unwrap();
assert_snapshot!(
plan,
@r#"
Projection: count(*)
Aggregate: groupBy=[[]], aggr=[[count(*)]]
Filter: random() < Float64(42.3) / Float64(100)
Filter: person.id > Int64(5)
TableScan: person
"#
);
}

#[test]
fn select_tablesample_percent() {
let sql = "SELECT count(*)
FROM person
TABLESAMPLE SYSTEM (42 PERCENT)
WHERE id > 5";
let plan = logical_plan(sql).unwrap();
assert_snapshot!(
plan,
@r#"
Projection: count(*)
Aggregate: groupBy=[[]], aggr=[[count(*)]]
Filter: random() < Int64(42) / Float64(100)
Filter: person.id > Int64(5)
TableScan: person
"#
);
}

#[test]
fn select_sample() {
let sql = "SELECT count(*)
FROM person
SAMPLE 0.42
WHERE id > 5";
let plan = logical_plan(sql).unwrap();
assert_snapshot!(
plan,
@r#"
Projection: count(*)
Aggregate: groupBy=[[]], aggr=[[count(*)]]
Filter: random() < Float64(0.42)
Filter: person.id > Int64(5)
TableScan: person
"#
);
}

#[test]
fn select_sample_rows_unsupported() {
let sql = "SELECT count(*)
FROM person
TABLESAMPLE (5 ROWS)";
let err = logical_plan(sql);
assert_contains!(
err.unwrap_err().to_string(),
"Table sample with rows unit is not supported"
);
}

#[test]
fn select_sample_bucket_unsupported() {
let sql = "SELECT count(*)
FROM person
TABLESAMPLE (BUCKET 3 OUT OF 16 ON id)";
let err = logical_plan(sql);
assert_contains!(
err.unwrap_err().to_string(),
"Table sample bucket is not supported"
);
}

#[test]
fn select_sample_seed_unsupported() {
let sql = "SELECT count(*)
FROM person
TABLESAMPLE SYSTEM (3) REPEATABLE (82)";
let err = logical_plan(sql);
assert_contains!(
err.unwrap_err().to_string(),
"Table sample seed is not supported"
);
}
43 changes: 43 additions & 0 deletions datafusion/sqllogictest/test_files/select.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1869,5 +1869,48 @@ select *, count(*) over() as ta from t;
3 4
1 4

# table sample random
query TT
EXPLAIN SELECT COUNT(*) from t TABLESAMPLE SYSTEM (42) WHERE a < 10;
----
logical_plan
01)Projection: count(Int64(1)) AS count(*)
02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
03)----Projection:
04)------Filter: t.a < Int32(10) AND random() < Float64(0.42)
05)--------TableScan: t projection=[a]
physical_plan
01)ProjectionExec: expr=[count(Int64(1))@0 as count(*)]
02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
06)----------ProjectionExec: expr=[]
07)------------CoalesceBatchesExec: target_batch_size=8192
08)--------------FilterExec: a@0 < 10 AND random() < 0.42
09)----------------DataSourceExec: partitions=1, partition_sizes=[1]

# sample random
query TT
EXPLAIN SELECT COUNT(*) from t SAMPLE 0.42 WHERE a < 10;
----
logical_plan
01)Projection: count(Int64(1)) AS count(*)
02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
03)----Projection:
04)------Filter: t.a < Int32(10) AND random() < Float64(0.42)
05)--------TableScan: t projection=[a]
physical_plan
01)ProjectionExec: expr=[count(Int64(1))@0 as count(*)]
02)--AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
06)----------ProjectionExec: expr=[]
07)------------CoalesceBatchesExec: target_batch_size=8192
08)--------------FilterExec: a@0 < 10 AND random() < 0.42
09)----------------DataSourceExec: partitions=1, partition_sizes=[1]


statement count 0
drop table t;