Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/materialized/dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1447,7 +1447,7 @@ mod test {
.enumerate()
.filter_map(|(i, c)| case.partition_cols.contains(&c.name.as_str()).then_some(i))
.collect();
println!("indices: {:?}", partition_col_indices);
println!("indices: {partition_col_indices:?}");
let analyzed = pushdown_projection_inexact(plan.clone(), &partition_col_indices)?;
println!(
"inexact projection pushdown:\n{}",
Expand Down
2 changes: 1 addition & 1 deletion src/materialized/row_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl RowMetadataRegistry {
.get(&table.to_string())
.map(|o| Arc::clone(o.value()))
.or_else(|| self.default_source.clone())
.ok_or_else(|| DataFusionError::Internal(format!("No metadata source for {}", table)))
.ok_or_else(|| DataFusionError::Internal(format!("No metadata source for {table}")))
}
}

Expand Down
94 changes: 44 additions & 50 deletions src/rewrite/exploitation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ use super::QueryRewriteOptions;
/// A cost function. Used to evaluate the best physical plan among multiple equivalent choices.
pub type CostFn = Arc<dyn Fn(&dyn ExecutionPlan) -> f64 + Send + Sync>;

/// Wrapper for CostFn that implements Debug.
#[derive(Clone)]
pub struct CostFnWrapper(pub CostFn);

impl std::fmt::Debug for CostFnWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "CostFnWrapper")
}
}

/// A logical optimizer that generates candidate logical plans in the form of [`OneOf`] nodes.
#[derive(Debug)]
pub struct ViewMatcher {
Expand Down Expand Up @@ -210,21 +220,11 @@ fn locate_single_table_scan(node: &LogicalPlan) -> Result<Option<TableReference>
Ok(table_reference)
}

/// [`ExtensionPlanner`]` that chooses the best plan from a `OneOf` node.
pub struct ViewExploitationPlanner {
cost: CostFn,
}

impl ViewExploitationPlanner {
/// Initialize this ViewExploitationPlanner with a given cost function.
pub fn new(cost: CostFn) -> Self {
Self { cost }
}
}
/// [`ExtensionPlanner`]` that converts `OneOf` logical nodes into a correspanding physical plan
pub struct ViewExploitationPlanner;

#[async_trait]
impl ExtensionPlanner for ViewExploitationPlanner {
/// Choose the best candidate and use it for the physical plan.
async fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
Expand Down Expand Up @@ -260,7 +260,6 @@ impl ExtensionPlanner for ViewExploitationPlanner {
Ok(Some(Arc::new(OneOfExec::try_new(
physical_inputs.to_vec(),
None,
Arc::clone(&self.cost),
)?)))
}
}
Expand Down Expand Up @@ -317,18 +316,13 @@ pub struct OneOfExec {
// This will inform DataFusion to add sorts to children,
// which will improve cost estimation of candidates
required_input_ordering: Option<LexRequirement>,
// Index of the candidate with the best cost
best: usize,
// Cost function to use in optimization
cost: CostFn,
}

impl std::fmt::Debug for OneOfExec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OneOfExec")
.field("candidates", &self.candidates)
.field("required_input_ordering", &self.required_input_ordering)
.field("best", &self.best)
.finish_non_exhaustive()
}
}
Expand All @@ -338,32 +332,19 @@ impl OneOfExec {
pub fn try_new(
candidates: Vec<Arc<dyn ExecutionPlan>>,
required_input_ordering: Option<LexRequirement>,
cost: CostFn,
) -> Result<Self> {
if candidates.is_empty() {
return Err(DataFusionError::Plan(
"can't create OneOfExec with empty children".to_string(),
));
}
let best = candidates
.iter()
.position_min_by_key(|candidate| OrderedFloat(cost(candidate.as_ref())))
.unwrap();

Ok(Self {
candidates,
required_input_ordering,
best,
cost,
})
}

/// Return the best of this `OneOfExec`'s children, using the cost function provided to
/// this plan at initialization timee
pub fn best(&self) -> Arc<dyn ExecutionPlan> {
Arc::clone(&self.candidates[self.best])
}

/// Modify this plan's required input ordering.
/// Used for sort pushdown
pub fn with_required_input_ordering(self, requirement: Option<LexRequirement>) -> Self {
Expand All @@ -383,8 +364,12 @@ impl ExecutionPlan for OneOfExec {
self
}

// The method will be called in some physical optimizer rules,
// Such as `EnforceDistribution` and `EnforceOrdering`.
// But the implementation of the method is fragile
// So please call `PruneCandidates` optimizer rule before such rules to remove the OneofExec
fn properties(&self) -> &PlanProperties {
self.candidates[self.best].properties()
self.candidates[0].properties()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe there will be potential bug, need to think more 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some NOTES for the method to avoid potential bug

}

fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
Expand Down Expand Up @@ -414,7 +399,6 @@ impl ExecutionPlan for OneOfExec {
Ok(Arc::new(Self::try_new(
children,
self.required_input_ordering.clone(),
Arc::clone(&self.cost),
)?))
}

Expand All @@ -423,35 +407,25 @@ impl ExecutionPlan for OneOfExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
self.candidates[self.best].execute(partition, context)
}

fn statistics(&self) -> Result<datafusion_common::Statistics> {
self.candidates[self.best].partition_statistics(None)
// The method shouldn't be called in practice, as the execution plan will be removed by optimizer.
self.candidates[0].execute(partition, context)
}

fn partition_statistics(
&self,
partition: Option<usize>,
) -> Result<datafusion_common::Statistics> {
self.candidates[self.best].partition_statistics(partition)
self.candidates[0].partition_statistics(partition)
}
}

impl DisplayAs for OneOfExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let costs = self
.children()
.iter()
.map(|c| (self.cost)(c.as_ref()))
.collect_vec();
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"OneOfExec(best={}), costs=[{}], required_input_ordering=[{}]",
self.best,
costs.into_iter().join(","),
"OneOfExec: required_input_ordering=[{}]",
format_physical_sort_requirement_list(
&self
.required_input_ordering
Expand All @@ -470,8 +444,20 @@ impl DisplayAs for OneOfExec {
}

/// Finalize selection of best candidate plan from a OneOfExec.
#[derive(Debug, Clone, Default)]
pub struct PruneCandidates;
#[derive(Debug, Clone)]
pub struct PruneCandidates {
/// Cost function to use in optimization
pub cost: CostFnWrapper,
}

impl PruneCandidates {
/// Create a new `PruneCandidates` optimizer rule with the given cost function.
pub fn new(cost: CostFn) -> Self {
Self {
cost: CostFnWrapper(cost),
}
}
}

impl PhysicalOptimizerRule for PruneCandidates {
fn optimize(
Expand All @@ -482,8 +468,16 @@ impl PhysicalOptimizerRule for PruneCandidates {
// Search for any OneOfExec nodes.
plan.transform(&|plan: Arc<dyn ExecutionPlan>| {
if let Some(one_of_exec) = plan.as_any().downcast_ref::<OneOfExec>() {
let candidates = one_of_exec.children();
let best = candidates
.iter()
.position_min_by_key(|candidate| {
OrderedFloat((self.cost.0)(candidate.as_ref()))
})
.unwrap();

Ok(Transformed::new(
one_of_exec.best(),
Arc::clone(candidates[best]),
true,
TreeNodeRecursion::Jump,
))
Expand Down