Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 124 additions & 160 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,58 +830,53 @@ impl DefaultPhysicalPlanner {
let (mut aggregates, filters, _order_bys): (Vec<_>, Vec<_>, Vec<_>) =
multiunzip(agg_filter);

let mut async_exprs = Vec::new();
let num_input_columns = physical_input_schema.fields().len();

for agg_func in &mut aggregates {
match self.try_plan_async_exprs(
num_input_columns,
PlannedExprResult::Expr(agg_func.expressions()),
physical_input_schema.as_ref(),
)? {
PlanAsyncExpr::Async(
async_map,
PlannedExprResult::Expr(physical_exprs),
) => {
async_exprs.extend(async_map.async_exprs);

if let Some(new_agg_func) = agg_func.with_new_expressions(
physical_exprs,
agg_func
.order_bys()
.iter()
.cloned()
.map(|x| x.expr)
.collect(),
) {
*agg_func = Arc::new(new_agg_func);
} else {
return internal_err!("Failed to plan async expression");
}
}
PlanAsyncExpr::Sync(PlannedExprResult::Expr(_)) => {
// Do nothing
}
_ => {
return internal_err!(
"Unexpected result from try_plan_async_exprs"
);
// Collect all expressions from all aggregate functions
let all_agg_exprs: Vec<Arc<dyn PhysicalExpr>> = aggregates
.iter()
.flat_map(|agg| agg.expressions())
.collect();

// Check if any aggregate expressions contain async UDFs
let (input_exec, rewritten_exprs) =
self.maybe_wrap_async_exec(input_exec, all_agg_exprs)?;

// If we wrapped with AsyncFuncExec, update the aggregate functions
// with their rewritten expressions
if input_exec.schema().fields().len()
> physical_input_schema.fields().len()
{
let mut expr_iter = rewritten_exprs.into_iter();
for agg_func in &mut aggregates {
let num_exprs = agg_func.expressions().len();
let new_exprs: Vec<_> =
expr_iter.by_ref().take(num_exprs).collect();

if let Some(new_agg_func) = agg_func.with_new_expressions(
new_exprs,
agg_func
.order_bys()
.iter()
.cloned()
.map(|x| x.expr)
.collect(),
) {
*agg_func = Arc::new(new_agg_func);
} else {
return internal_err!("Failed to plan async expression");
}
}
}
let input_exec = if !async_exprs.is_empty() {
Arc::new(AsyncFuncExec::try_new(async_exprs, input_exec)?)
} else {
input_exec
};

// Use the actual input schema (which includes async columns if wrapped)
let input_schema = input_exec.schema();

let initial_aggr = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
groups.clone(),
aggregates,
filters.clone(),
input_exec,
Arc::clone(&physical_input_schema),
Arc::clone(&input_schema),
)?);

let can_repartition = !groups.is_empty()
Expand Down Expand Up @@ -912,7 +907,7 @@ impl DefaultPhysicalPlanner {
updated_aggregates,
filters,
initial_aggr,
Arc::clone(&physical_input_schema),
input_schema,
)?)
}
LogicalPlan::Projection(Projection { input, expr, .. }) => self
Expand All @@ -927,45 +922,26 @@ impl DefaultPhysicalPlanner {
}) => {
let physical_input = children.one()?;
let input_dfschema = input.schema();
let num_input_columns = input_dfschema.fields().len();

let runtime_expr =
self.create_physical_expr(predicate, input_dfschema, session_state)?;

let input_schema = input.schema();
let filter = match self.try_plan_async_exprs(
input_schema.fields().len(),
PlannedExprResult::Expr(vec![runtime_expr]),
input_schema.as_arrow(),
)? {
PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => {
FilterExec::try_new(Arc::clone(&runtime_expr[0]), physical_input)?
.with_batch_size(session_state.config().batch_size())?
}
PlanAsyncExpr::Async(
async_map,
PlannedExprResult::Expr(runtime_expr),
) => {
let async_exec = AsyncFuncExec::try_new(
async_map.async_exprs,
physical_input,
)?;
FilterExec::try_new(
Arc::clone(&runtime_expr[0]),
Arc::new(async_exec),
)?
// project the output columns excluding the async functions
// The async functions are always appended to the end of the schema.
.with_projection(Some(
(0..input.schema().fields().len()).collect(),
))?
.with_batch_size(session_state.config().batch_size())?
}
_ => {
return internal_err!(
"Unexpected result from try_plan_async_exprs"
);
}
};
let (wrapped_input, rewritten_exprs) =
self.maybe_wrap_async_exec(physical_input, vec![runtime_expr])?;

// Check if we wrapped with AsyncFuncExec by comparing schemas
let has_async = wrapped_input.schema().fields().len() > num_input_columns;

let mut filter =
FilterExec::try_new(Arc::clone(&rewritten_exprs[0]), wrapped_input)?
.with_batch_size(session_state.config().batch_size())?;

if has_async {
// Project away the async function columns appended to the schema
filter =
filter.with_projection(Some((0..num_input_columns).collect()))?;
}

let selectivity = session_state
.config()
Expand Down Expand Up @@ -2556,7 +2532,6 @@ impl DefaultPhysicalPlanner {
expr: &[Expr],
) -> Result<Arc<dyn ExecutionPlan>> {
let input_logical_schema = input.as_ref().schema();
let input_physical_schema = input_exec.schema();
let physical_exprs = expr
.iter()
.map(|e| {
Expand Down Expand Up @@ -2595,99 +2570,88 @@ impl DefaultPhysicalPlanner {
})
.collect::<Result<Vec<_>>>()?;

let num_input_columns = input_exec.schema().fields().len();

match self.try_plan_async_exprs(
num_input_columns,
PlannedExprResult::ExprWithName(physical_exprs),
input_physical_schema.as_ref(),
)? {
PlanAsyncExpr::Sync(PlannedExprResult::ExprWithName(physical_exprs)) => {
let proj_exprs: Vec<ProjectionExpr> = physical_exprs
.into_iter()
.map(|(expr, alias)| ProjectionExpr { expr, alias })
.collect();
Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input_exec)?))
}
PlanAsyncExpr::Async(
async_map,
PlannedExprResult::ExprWithName(physical_exprs),
) => {
let async_exec =
AsyncFuncExec::try_new(async_map.async_exprs, input_exec)?;
let proj_exprs: Vec<ProjectionExpr> = physical_exprs
.into_iter()
.map(|(expr, alias)| ProjectionExpr { expr, alias })
.collect();
let new_proj_exec =
ProjectionExec::try_new(proj_exprs, Arc::new(async_exec))?;
Ok(Arc::new(new_proj_exec))
}
_ => internal_err!("Unexpected PlanAsyncExpressions variant"),
}
let (input_exec, physical_exprs) =
self.maybe_wrap_async_exec_with_names(input_exec, physical_exprs)?;

let proj_exprs: Vec<ProjectionExpr> = physical_exprs
.into_iter()
.map(|(expr, alias)| ProjectionExpr { expr, alias })
.collect();

Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input_exec)?))
}

fn try_plan_async_exprs(
/// Wraps `input_exec` with `AsyncFuncExec` if any expressions contain async UDFs.
///
/// Returns the (possibly wrapped) execution plan and rewritten expressions.
/// If no async UDFs are found, returns the original input and expressions unchanged.
#[expect(clippy::type_complexity)]
fn maybe_wrap_async_exec(
&self,
num_input_columns: usize,
physical_expr: PlannedExprResult,
schema: &Schema,
) -> Result<PlanAsyncExpr> {
input_exec: Arc<dyn ExecutionPlan>,
exprs: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<(Arc<dyn ExecutionPlan>, Vec<Arc<dyn PhysicalExpr>>)> {
let schema = input_exec.schema();
let num_input_columns = schema.fields().len();
let mut async_map = AsyncMapper::new(num_input_columns);
match &physical_expr {
PlannedExprResult::ExprWithName(exprs) => {
exprs
.iter()
.try_for_each(|(expr, _)| async_map.find_references(expr, schema))?;
}
PlannedExprResult::Expr(exprs) => {
exprs
.iter()
.try_for_each(|expr| async_map.find_references(expr, schema))?;
}

for expr in &exprs {
async_map.find_references(expr, schema.as_ref())?;
}

if async_map.is_empty() {
return Ok(PlanAsyncExpr::Sync(physical_expr));
return Ok((input_exec, exprs));
}

let new_exprs = match physical_expr {
PlannedExprResult::ExprWithName(exprs) => PlannedExprResult::ExprWithName(
exprs
.iter()
.map(|(expr, column_name)| {
let new_expr = Arc::clone(expr)
.transform_up(|e| Ok(async_map.map_expr(e)))?;
Ok((new_expr.data, column_name.to_string()))
})
.collect::<Result<_>>()?,
),
PlannedExprResult::Expr(exprs) => PlannedExprResult::Expr(
exprs
.iter()
.map(|expr| {
let new_expr = Arc::clone(expr)
.transform_up(|e| Ok(async_map.map_expr(e)))?;
Ok(new_expr.data)
})
.collect::<Result<_>>()?,
),
};
// rewrite the projection's expressions in terms of the columns with the result of async evaluation
Ok(PlanAsyncExpr::Async(async_map, new_exprs))
let rewritten_exprs = exprs
.into_iter()
.map(|expr| {
let new_expr = expr.transform_up(|e| Ok(async_map.map_expr(e)))?;
Ok(new_expr.data)
})
.collect::<Result<Vec<_>>>()?;

let async_exec =
Arc::new(AsyncFuncExec::try_new(async_map.async_exprs, input_exec)?);

Ok((async_exec, rewritten_exprs))
}
}

#[derive(Debug)]
enum PlannedExprResult {
ExprWithName(Vec<(Arc<dyn PhysicalExpr>, String)>),
Expr(Vec<Arc<dyn PhysicalExpr>>),
}
/// Wraps `input_exec` with `AsyncFuncExec` if any expressions contain async UDFs.
///
/// Variant for named expressions (used by projections).
/// Returns the (possibly wrapped) execution plan and rewritten expressions with names.
#[expect(clippy::type_complexity)]
fn maybe_wrap_async_exec_with_names(
&self,
input_exec: Arc<dyn ExecutionPlan>,
exprs: Vec<(Arc<dyn PhysicalExpr>, String)>,
) -> Result<(Arc<dyn ExecutionPlan>, Vec<(Arc<dyn PhysicalExpr>, String)>)> {
let schema = input_exec.schema();
let num_input_columns = schema.fields().len();
let mut async_map = AsyncMapper::new(num_input_columns);

#[derive(Debug)]
enum PlanAsyncExpr {
Sync(PlannedExprResult),
Async(AsyncMapper, PlannedExprResult),
for (expr, _) in &exprs {
async_map.find_references(expr, schema.as_ref())?;
}

if async_map.is_empty() {
return Ok((input_exec, exprs));
}

let rewritten_exprs = exprs
.into_iter()
.map(|(expr, name)| {
let new_expr = expr.transform_up(|e| Ok(async_map.map_expr(e)))?;
Ok((new_expr.data, name))
})
.collect::<Result<Vec<_>>>()?;

let async_exec =
Arc::new(AsyncFuncExec::try_new(async_map.async_exprs, input_exec)?);

Ok((async_exec, rewritten_exprs))
}
}

fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
Expand Down