diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index de50181..e6b5fda 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -1447,7 +1447,7 @@ mod test { .enumerate() .filter_map(|(i, c)| case.partition_cols.contains(&c.name.as_str()).then_some(i)) .collect(); - println!("indices: {:?}", partition_col_indices); + println!("indices: {partition_col_indices:?}"); let analyzed = pushdown_projection_inexact(plan.clone(), &partition_col_indices)?; println!( "inexact projection pushdown:\n{}", diff --git a/src/materialized/row_metadata.rs b/src/materialized/row_metadata.rs index 6476f39..fa12cdf 100644 --- a/src/materialized/row_metadata.rs +++ b/src/materialized/row_metadata.rs @@ -98,7 +98,7 @@ impl RowMetadataRegistry { .get(&table.to_string()) .map(|o| Arc::clone(o.value())) .or_else(|| self.default_source.clone()) - .ok_or_else(|| DataFusionError::Internal(format!("No metadata source for {}", table))) + .ok_or_else(|| DataFusionError::Internal(format!("No metadata source for {table}"))) } } diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index e8f3003..dbcdc3e 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -43,6 +43,16 @@ use super::QueryRewriteOptions; /// A cost function. Used to evaluate the best physical plan among multiple equivalent choices. pub type CostFn = Arc f64 + Send + Sync>; +/// Wrapper for CostFn that implements Debug. +#[derive(Clone)] +pub struct CostFnWrapper(pub CostFn); + +impl std::fmt::Debug for CostFnWrapper { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "CostFnWrapper") + } +} + /// A logical optimizer that generates candidate logical plans in the form of [`OneOf`] nodes. #[derive(Debug)] pub struct ViewMatcher { @@ -210,21 +220,11 @@ fn locate_single_table_scan(node: &LogicalPlan) -> Result Ok(table_reference) } -/// [`ExtensionPlanner`]` that chooses the best plan from a `OneOf` node. -pub struct ViewExploitationPlanner { - cost: CostFn, -} - -impl ViewExploitationPlanner { - /// Initialize this ViewExploitationPlanner with a given cost function. - pub fn new(cost: CostFn) -> Self { - Self { cost } - } -} +/// [`ExtensionPlanner`]` that converts `OneOf` logical nodes into a correspanding physical plan +pub struct ViewExploitationPlanner; #[async_trait] impl ExtensionPlanner for ViewExploitationPlanner { - /// Choose the best candidate and use it for the physical plan. async fn plan_extension( &self, _planner: &dyn PhysicalPlanner, @@ -260,7 +260,6 @@ impl ExtensionPlanner for ViewExploitationPlanner { Ok(Some(Arc::new(OneOfExec::try_new( physical_inputs.to_vec(), None, - Arc::clone(&self.cost), )?))) } } @@ -317,10 +316,6 @@ pub struct OneOfExec { // This will inform DataFusion to add sorts to children, // which will improve cost estimation of candidates required_input_ordering: Option, - // Index of the candidate with the best cost - best: usize, - // Cost function to use in optimization - cost: CostFn, } impl std::fmt::Debug for OneOfExec { @@ -328,7 +323,6 @@ impl std::fmt::Debug for OneOfExec { f.debug_struct("OneOfExec") .field("candidates", &self.candidates) .field("required_input_ordering", &self.required_input_ordering) - .field("best", &self.best) .finish_non_exhaustive() } } @@ -338,32 +332,19 @@ impl OneOfExec { pub fn try_new( candidates: Vec>, required_input_ordering: Option, - cost: CostFn, ) -> Result { if candidates.is_empty() { return Err(DataFusionError::Plan( "can't create OneOfExec with empty children".to_string(), )); } - let best = candidates - .iter() - .position_min_by_key(|candidate| OrderedFloat(cost(candidate.as_ref()))) - .unwrap(); Ok(Self { candidates, required_input_ordering, - best, - cost, }) } - /// Return the best of this `OneOfExec`'s children, using the cost function provided to - /// this plan at initialization timee - pub fn best(&self) -> Arc { - Arc::clone(&self.candidates[self.best]) - } - /// Modify this plan's required input ordering. /// Used for sort pushdown pub fn with_required_input_ordering(self, requirement: Option) -> Self { @@ -383,8 +364,12 @@ impl ExecutionPlan for OneOfExec { self } + // The method will be called in some physical optimizer rules, + // Such as `EnforceDistribution` and `EnforceOrdering`. + // But the implementation of the method is fragile + // So please call `PruneCandidates` optimizer rule before such rules to remove the OneofExec fn properties(&self) -> &PlanProperties { - self.candidates[self.best].properties() + self.candidates[0].properties() } fn required_input_ordering(&self) -> Vec> { @@ -414,7 +399,6 @@ impl ExecutionPlan for OneOfExec { Ok(Arc::new(Self::try_new( children, self.required_input_ordering.clone(), - Arc::clone(&self.cost), )?)) } @@ -423,35 +407,25 @@ impl ExecutionPlan for OneOfExec { partition: usize, context: Arc, ) -> Result { - self.candidates[self.best].execute(partition, context) - } - - fn statistics(&self) -> Result { - self.candidates[self.best].partition_statistics(None) + // The method shouldn't be called in practice, as the execution plan will be removed by optimizer. + self.candidates[0].execute(partition, context) } fn partition_statistics( &self, partition: Option, ) -> Result { - self.candidates[self.best].partition_statistics(partition) + self.candidates[0].partition_statistics(partition) } } impl DisplayAs for OneOfExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { - let costs = self - .children() - .iter() - .map(|c| (self.cost)(c.as_ref())) - .collect_vec(); match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { write!( f, - "OneOfExec(best={}), costs=[{}], required_input_ordering=[{}]", - self.best, - costs.into_iter().join(","), + "OneOfExec: required_input_ordering=[{}]", format_physical_sort_requirement_list( &self .required_input_ordering @@ -470,8 +444,20 @@ impl DisplayAs for OneOfExec { } /// Finalize selection of best candidate plan from a OneOfExec. -#[derive(Debug, Clone, Default)] -pub struct PruneCandidates; +#[derive(Debug, Clone)] +pub struct PruneCandidates { + /// Cost function to use in optimization + pub cost: CostFnWrapper, +} + +impl PruneCandidates { + /// Create a new `PruneCandidates` optimizer rule with the given cost function. + pub fn new(cost: CostFn) -> Self { + Self { + cost: CostFnWrapper(cost), + } + } +} impl PhysicalOptimizerRule for PruneCandidates { fn optimize( @@ -482,8 +468,16 @@ impl PhysicalOptimizerRule for PruneCandidates { // Search for any OneOfExec nodes. plan.transform(&|plan: Arc| { if let Some(one_of_exec) = plan.as_any().downcast_ref::() { + let candidates = one_of_exec.children(); + let best = candidates + .iter() + .position_min_by_key(|candidate| { + OrderedFloat((self.cost.0)(candidate.as_ref())) + }) + .unwrap(); + Ok(Transformed::new( - one_of_exec.best(), + Arc::clone(candidates[best]), true, TreeNodeRecursion::Jump, ))