Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ arrow = { workspace = true }
bigdecimal = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-expr = { workspace = true }
datafusion-functions = { workspace = true, default-features = true }
indexmap = { workspace = true }
log = { workspace = true }
recursive = { workspace = true, optional = true }
Expand Down
151 changes: 145 additions & 6 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use datafusion_common::error::DataFusionErrorBuilder;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{not_impl_err, plan_err, Result};
use datafusion_common::{RecursionUnnestOption, UnnestOptions};
use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions};
use datafusion_expr::expr::{
Alias, PlannedReplaceSelectItem, ScalarFunction, WildcardOptions,
};
use datafusion_expr::expr_rewriter::{
normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_sorts,
};
Expand All @@ -40,14 +42,17 @@ use datafusion_expr::utils::{
expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_window_exprs,
};
use datafusion_expr::{
Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder,
LogicalPlanBuilderOptions, Partitioning,
lit, Aggregate, BinaryExpr, Expr, Filter, GroupingSet, LogicalPlan,
LogicalPlanBuilder, LogicalPlanBuilderOptions, Operator, Partitioning, ScalarUDF,
};
use datafusion_functions::math::random::RandomFunc;

use indexmap::IndexMap;
use sqlparser::ast::{
visit_expressions_mut, Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr,
OrderBy, SelectItemQualifiedWildcardKind, WildcardAdditionalOptions, WindowType,
OrderBy, SelectItemQualifiedWildcardKind, TableFactor, TableSampleKind,
TableSampleModifier, TableSampleQuantity, TableSampleUnit, WildcardAdditionalOptions,
WindowType,
};
use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem, TableWithJoins};

Expand Down Expand Up @@ -77,11 +82,29 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}

// Process `from` clause
let plan = self.plan_from_tables(select.from, planner_context)?;
let plan = self.plan_from_tables(select.from.clone(), planner_context)?;
let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));

// Process `where` clause
let base_plan = self.plan_selection(select.selection, plan, planner_context)?;
let mut base_plan =
Copy link
Contributor

@2010YOUY01 2010YOUY01 Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we'd better do this rewrite in a separate logical optimizer rule, to keep the planning code clean. It can be done with a follow-up PR before adding more functionality to scan sampling.
(Unless there's a specific reason to do this during the planning phase — I did notice some rewrites happening during planning, but I'm not sure why.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"logical optimizer rule" mainly focuses on Optimization, I think it's fair to rewrite during planning phase.

self.plan_selection(select.selection, plan, planner_context)?;

// Now `base_plan` is a LogicalPlan::Filter
if let Some(from_first) = select.from.first() {
if let TableFactor::Table {
sample: Some(sample),
..
} = &from_first.relation
{
// Rewrite SAMPLE / TABLESAMPLE clause to additional filters
// TODO: handle samples from joined tables
base_plan = self.sample_to_where_random_clause(
base_plan,
sample,
planner_context,
)?;
}
}

// Handle named windows before processing the projection expression
check_conflicting_windows(&select.named_window)?;
Expand Down Expand Up @@ -556,6 +579,122 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
}
}

/// Extract an expression for table sample quantity
fn sample_quanitity_value(
&self,
quantity: &TableSampleQuantity,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
match &quantity.value {
// Support only numeric literals now
SQLExpr::Value(value_with_span) => Ok(self.parse_value(
value_with_span.value.clone(),
planner_context.prepare_param_data_types(),
)?),
_ => not_impl_err!(
"Table quantity value {:?} is not supported",
&quantity.value
),
}
}

/// Compose expression for TABLE SAMPLE filter
fn sample_to_threshold_expr(
&self,
table_sample_kind: &TableSampleKind,
planner_context: &mut PlannerContext,
) -> Result<Box<Expr>> {
// Support both before and after syntax
let table_sample = match table_sample_kind {
// Standard syntax
TableSampleKind::BeforeTableAlias(kind) => kind,
// Hive syntax
TableSampleKind::AfterTableAlias(kind) => kind,
};

// These features are not part of a common SQL specification,
// not implemented yet
if table_sample.seed.is_some() {
return not_impl_err!("Table sample seed is not supported");
}
if table_sample.bucket.is_some() {
return not_impl_err!("Table sample bucket is not supported");
}
if table_sample.offset.is_some() {
return not_impl_err!("Table sample offset is not supported");
}

if let Some(table_sample_quantity) = &table_sample.quantity {
match table_sample_quantity.unit {
Some(TableSampleUnit::Rows) => {
// Fixed size row sampling is not supported
not_impl_err!("Table sample with rows unit is not supported")
}
Some(TableSampleUnit::Percent) | None => {
// There are two flavors of sampling (`TableSampleMethod`):
// - Block-level sampling (SYSTEM or BLOCK keywords)
// - Row-level sampling (BERNOULLI or ROW keywords)
// `random()` filter pushdown allows only block-level sampling,
// not row-level. However, we do not forbid using BERNOULLI/ROW;

// Extract quantity from SQLExpr
let quantity_value: Expr = self
.sample_quanitity_value(table_sample_quantity, planner_context)?;

let ratio: Expr = match table_sample.modifier {
TableSampleModifier::TableSample =>
// SELECT * FROM tbl TABLESAMPLE SYSTEM (10),
// Value is percentage
{
Expr::BinaryExpr(BinaryExpr::new(
Box::new(quantity_value),
Operator::Divide,
Box::new(lit(100.0)),
))
}
TableSampleModifier::Sample =>
// SELECT * FROM tbl SAMPLE 0.1
// Value is floating ratio, pass as is
{
quantity_value
}
};

let random_threshold = Box::new(ratio);
Ok(random_threshold)
}
}
} else {
plan_err!("Table sample quantity must be specified")
}
}

/// Compose a logical plan with a static Filter based on TABLE SAMPLE expression
fn sample_to_where_random_clause(
&self,
plan: LogicalPlan,
sample_kind: &TableSampleKind,
planner_context: &mut PlannerContext,
) -> Result<LogicalPlan> {
// `random()` call
let random_udf = ScalarUDF::new_from_impl(RandomFunc::new());
let random_expr_call = Box::new(Expr::ScalarFunction(ScalarFunction::new_udf(
Arc::new(random_udf),
vec![],
)));
let random_threshold =
self.sample_to_threshold_expr(sample_kind, planner_context)?;
// New filter predicate: `random() < 0.1`
let predicate = Expr::BinaryExpr(BinaryExpr::new(
random_expr_call,
Operator::Lt,
random_threshold,
));
let random_filter = Filter::try_new(predicate, Arc::new(plan));

Ok(LogicalPlan::Filter(random_filter?))
}

pub(crate) fn plan_from_tables(
&self,
mut from: Vec<TableWithJoins>,
Expand Down
Loading