diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 2715ad98202c..a455b09c1f9d 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -830,50 +830,45 @@ impl DefaultPhysicalPlanner { let (mut aggregates, filters, _order_bys): (Vec<_>, Vec<_>, Vec<_>) = multiunzip(agg_filter); - let mut async_exprs = Vec::new(); - let num_input_columns = physical_input_schema.fields().len(); - - for agg_func in &mut aggregates { - match self.try_plan_async_exprs( - num_input_columns, - PlannedExprResult::Expr(agg_func.expressions()), - physical_input_schema.as_ref(), - )? { - PlanAsyncExpr::Async( - async_map, - PlannedExprResult::Expr(physical_exprs), - ) => { - async_exprs.extend(async_map.async_exprs); - - if let Some(new_agg_func) = agg_func.with_new_expressions( - physical_exprs, - agg_func - .order_bys() - .iter() - .cloned() - .map(|x| x.expr) - .collect(), - ) { - *agg_func = Arc::new(new_agg_func); - } else { - return internal_err!("Failed to plan async expression"); - } - } - PlanAsyncExpr::Sync(PlannedExprResult::Expr(_)) => { - // Do nothing - } - _ => { - return internal_err!( - "Unexpected result from try_plan_async_exprs" - ); + // Collect all expressions from all aggregate functions + let all_agg_exprs: Vec> = aggregates + .iter() + .flat_map(|agg| agg.expressions()) + .collect(); + + // Check if any aggregate expressions contain async UDFs + let (input_exec, rewritten_exprs) = + self.maybe_wrap_async_exec(input_exec, all_agg_exprs)?; + + // If we wrapped with AsyncFuncExec, update the aggregate functions + // with their rewritten expressions + if input_exec.schema().fields().len() + > physical_input_schema.fields().len() + { + let mut expr_iter = rewritten_exprs.into_iter(); + for agg_func in &mut aggregates { + let num_exprs = agg_func.expressions().len(); + let new_exprs: Vec<_> = + expr_iter.by_ref().take(num_exprs).collect(); + + if let Some(new_agg_func) = agg_func.with_new_expressions( + new_exprs, + agg_func + .order_bys() + .iter() + .cloned() + .map(|x| x.expr) + .collect(), + ) { + *agg_func = Arc::new(new_agg_func); + } else { + return internal_err!("Failed to plan async expression"); } } } - let input_exec = if !async_exprs.is_empty() { - Arc::new(AsyncFuncExec::try_new(async_exprs, input_exec)?) - } else { - input_exec - }; + + // Use the actual input schema (which includes async columns if wrapped) + let input_schema = input_exec.schema(); let initial_aggr = Arc::new(AggregateExec::try_new( AggregateMode::Partial, @@ -881,7 +876,7 @@ impl DefaultPhysicalPlanner { aggregates, filters.clone(), input_exec, - Arc::clone(&physical_input_schema), + Arc::clone(&input_schema), )?); let can_repartition = !groups.is_empty() @@ -912,7 +907,7 @@ impl DefaultPhysicalPlanner { updated_aggregates, filters, initial_aggr, - Arc::clone(&physical_input_schema), + input_schema, )?) } LogicalPlan::Projection(Projection { input, expr, .. }) => self @@ -927,45 +922,26 @@ impl DefaultPhysicalPlanner { }) => { let physical_input = children.one()?; let input_dfschema = input.schema(); + let num_input_columns = input_dfschema.fields().len(); let runtime_expr = self.create_physical_expr(predicate, input_dfschema, session_state)?; - let input_schema = input.schema(); - let filter = match self.try_plan_async_exprs( - input_schema.fields().len(), - PlannedExprResult::Expr(vec![runtime_expr]), - input_schema.as_arrow(), - )? { - PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => { - FilterExec::try_new(Arc::clone(&runtime_expr[0]), physical_input)? - .with_batch_size(session_state.config().batch_size())? - } - PlanAsyncExpr::Async( - async_map, - PlannedExprResult::Expr(runtime_expr), - ) => { - let async_exec = AsyncFuncExec::try_new( - async_map.async_exprs, - physical_input, - )?; - FilterExec::try_new( - Arc::clone(&runtime_expr[0]), - Arc::new(async_exec), - )? - // project the output columns excluding the async functions - // The async functions are always appended to the end of the schema. - .with_projection(Some( - (0..input.schema().fields().len()).collect(), - ))? - .with_batch_size(session_state.config().batch_size())? - } - _ => { - return internal_err!( - "Unexpected result from try_plan_async_exprs" - ); - } - }; + let (wrapped_input, rewritten_exprs) = + self.maybe_wrap_async_exec(physical_input, vec![runtime_expr])?; + + // Check if we wrapped with AsyncFuncExec by comparing schemas + let has_async = wrapped_input.schema().fields().len() > num_input_columns; + + let mut filter = + FilterExec::try_new(Arc::clone(&rewritten_exprs[0]), wrapped_input)? + .with_batch_size(session_state.config().batch_size())?; + + if has_async { + // Project away the async function columns appended to the schema + filter = + filter.with_projection(Some((0..num_input_columns).collect()))?; + } let selectivity = session_state .config() @@ -2556,7 +2532,6 @@ impl DefaultPhysicalPlanner { expr: &[Expr], ) -> Result> { let input_logical_schema = input.as_ref().schema(); - let input_physical_schema = input_exec.schema(); let physical_exprs = expr .iter() .map(|e| { @@ -2595,99 +2570,88 @@ impl DefaultPhysicalPlanner { }) .collect::>>()?; - let num_input_columns = input_exec.schema().fields().len(); - - match self.try_plan_async_exprs( - num_input_columns, - PlannedExprResult::ExprWithName(physical_exprs), - input_physical_schema.as_ref(), - )? { - PlanAsyncExpr::Sync(PlannedExprResult::ExprWithName(physical_exprs)) => { - let proj_exprs: Vec = physical_exprs - .into_iter() - .map(|(expr, alias)| ProjectionExpr { expr, alias }) - .collect(); - Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input_exec)?)) - } - PlanAsyncExpr::Async( - async_map, - PlannedExprResult::ExprWithName(physical_exprs), - ) => { - let async_exec = - AsyncFuncExec::try_new(async_map.async_exprs, input_exec)?; - let proj_exprs: Vec = physical_exprs - .into_iter() - .map(|(expr, alias)| ProjectionExpr { expr, alias }) - .collect(); - let new_proj_exec = - ProjectionExec::try_new(proj_exprs, Arc::new(async_exec))?; - Ok(Arc::new(new_proj_exec)) - } - _ => internal_err!("Unexpected PlanAsyncExpressions variant"), - } + let (input_exec, physical_exprs) = + self.maybe_wrap_async_exec_with_names(input_exec, physical_exprs)?; + + let proj_exprs: Vec = physical_exprs + .into_iter() + .map(|(expr, alias)| ProjectionExpr { expr, alias }) + .collect(); + + Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input_exec)?)) } - fn try_plan_async_exprs( + /// Wraps `input_exec` with `AsyncFuncExec` if any expressions contain async UDFs. + /// + /// Returns the (possibly wrapped) execution plan and rewritten expressions. + /// If no async UDFs are found, returns the original input and expressions unchanged. + #[expect(clippy::type_complexity)] + fn maybe_wrap_async_exec( &self, - num_input_columns: usize, - physical_expr: PlannedExprResult, - schema: &Schema, - ) -> Result { + input_exec: Arc, + exprs: Vec>, + ) -> Result<(Arc, Vec>)> { + let schema = input_exec.schema(); + let num_input_columns = schema.fields().len(); let mut async_map = AsyncMapper::new(num_input_columns); - match &physical_expr { - PlannedExprResult::ExprWithName(exprs) => { - exprs - .iter() - .try_for_each(|(expr, _)| async_map.find_references(expr, schema))?; - } - PlannedExprResult::Expr(exprs) => { - exprs - .iter() - .try_for_each(|expr| async_map.find_references(expr, schema))?; - } + + for expr in &exprs { + async_map.find_references(expr, schema.as_ref())?; } if async_map.is_empty() { - return Ok(PlanAsyncExpr::Sync(physical_expr)); + return Ok((input_exec, exprs)); } - let new_exprs = match physical_expr { - PlannedExprResult::ExprWithName(exprs) => PlannedExprResult::ExprWithName( - exprs - .iter() - .map(|(expr, column_name)| { - let new_expr = Arc::clone(expr) - .transform_up(|e| Ok(async_map.map_expr(e)))?; - Ok((new_expr.data, column_name.to_string())) - }) - .collect::>()?, - ), - PlannedExprResult::Expr(exprs) => PlannedExprResult::Expr( - exprs - .iter() - .map(|expr| { - let new_expr = Arc::clone(expr) - .transform_up(|e| Ok(async_map.map_expr(e)))?; - Ok(new_expr.data) - }) - .collect::>()?, - ), - }; - // rewrite the projection's expressions in terms of the columns with the result of async evaluation - Ok(PlanAsyncExpr::Async(async_map, new_exprs)) + let rewritten_exprs = exprs + .into_iter() + .map(|expr| { + let new_expr = expr.transform_up(|e| Ok(async_map.map_expr(e)))?; + Ok(new_expr.data) + }) + .collect::>>()?; + + let async_exec = + Arc::new(AsyncFuncExec::try_new(async_map.async_exprs, input_exec)?); + + Ok((async_exec, rewritten_exprs)) } -} -#[derive(Debug)] -enum PlannedExprResult { - ExprWithName(Vec<(Arc, String)>), - Expr(Vec>), -} + /// Wraps `input_exec` with `AsyncFuncExec` if any expressions contain async UDFs. + /// + /// Variant for named expressions (used by projections). + /// Returns the (possibly wrapped) execution plan and rewritten expressions with names. + #[expect(clippy::type_complexity)] + fn maybe_wrap_async_exec_with_names( + &self, + input_exec: Arc, + exprs: Vec<(Arc, String)>, + ) -> Result<(Arc, Vec<(Arc, String)>)> { + let schema = input_exec.schema(); + let num_input_columns = schema.fields().len(); + let mut async_map = AsyncMapper::new(num_input_columns); -#[derive(Debug)] -enum PlanAsyncExpr { - Sync(PlannedExprResult), - Async(AsyncMapper, PlannedExprResult), + for (expr, _) in &exprs { + async_map.find_references(expr, schema.as_ref())?; + } + + if async_map.is_empty() { + return Ok((input_exec, exprs)); + } + + let rewritten_exprs = exprs + .into_iter() + .map(|(expr, name)| { + let new_expr = expr.transform_up(|e| Ok(async_map.map_expr(e)))?; + Ok((new_expr.data, name)) + }) + .collect::>>()?; + + let async_exec = + Arc::new(AsyncFuncExec::try_new(async_map.async_exprs, input_exec)?); + + Ok((async_exec, rewritten_exprs)) + } } fn tuple_err(value: (Result, Result)) -> Result<(T, R)> {