Skip to content

Commit ceff6cb

Browse files
Orthogonalize distribution and sort enforcement rules into EnforceDistribution and EnforceSorting (apache#4839)
* Separate sort rule * Migrate to clearer file name, tidy up comments * Add a note about tests verifying EnforceDistribution/EnforceSorting jointly * Address review, fix the stale comment Co-authored-by: Mehmet Ozan Kabak <[email protected]>
1 parent c5e2594 commit ceff6cb

File tree

7 files changed

+87
-87
lines changed

7 files changed

+87
-87
lines changed

datafusion/core/src/execution/context.rs

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ use crate::physical_optimizer::repartition::Repartition;
6868

6969
use crate::config::ConfigOptions;
7070
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
71-
use crate::physical_optimizer::enforcement::BasicEnforcement;
71+
use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
7272
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
7373
use crate::physical_plan::planner::DefaultPhysicalPlanner;
7474
use crate::physical_plan::udaf::AggregateUDF;
@@ -91,9 +91,9 @@ use crate::catalog::listing_schema::ListingSchemaProvider;
9191
use crate::datasource::object_store::ObjectStoreUrl;
9292
use crate::execution::memory_pool::MemoryPool;
9393
use crate::physical_optimizer::global_sort_selection::GlobalSortSelection;
94-
use crate::physical_optimizer::optimize_sorts::OptimizeSorts;
9594
use crate::physical_optimizer::pipeline_checker::PipelineChecker;
9695
use crate::physical_optimizer::pipeline_fixer::PipelineFixer;
96+
use crate::physical_optimizer::sort_enforcement::EnforceSorting;
9797
use datafusion_optimizer::OptimizerConfig;
9898
use datafusion_sql::planner::object_name_to_table_reference;
9999
use uuid::Uuid;
@@ -1448,37 +1448,36 @@ impl SessionState {
14481448
// output partitioning of some operators in the plan tree, which will influence
14491449
// other rules. Therefore, it should run as soon as possible. It is optional because:
14501450
// - It's not used for the distributed engine, Ballista.
1451-
// - It's conflicted with some parts of the BasicEnforcement, since it will
1452-
// introduce additional repartitioning while the BasicEnforcement aims at
1453-
// reducing unnecessary repartitioning.
1451+
// - It's conflicted with some parts of the EnforceDistribution, since it will
1452+
// introduce additional repartitioning while EnforceDistribution aims to
1453+
// reduce unnecessary repartitioning.
14541454
Arc::new(Repartition::new()),
14551455
// - Currently it will depend on the partition number to decide whether to change the
14561456
// single node sort to parallel local sort and merge. Therefore, GlobalSortSelection
14571457
// should run after the Repartition.
14581458
// - Since it will change the output ordering of some operators, it should run
1459-
// before JoinSelection and BasicEnforcement, which may depend on that.
1459+
// before JoinSelection and EnforceSorting, which may depend on that.
14601460
Arc::new(GlobalSortSelection::new()),
14611461
// Statistics-based join selection will change the Auto mode to a real join implementation,
1462-
// like collect left, or hash join, or future sort merge join, which will
1463-
// influence the BasicEnforcement to decide whether to add additional repartition
1464-
// and local sort to meet the distribution and ordering requirements.
1465-
// Therefore, it should run before BasicEnforcement.
1462+
// like collect left, or hash join, or future sort merge join, which will influence the
1463+
// EnforceDistribution and EnforceSorting rules as they decide whether to add additional
1464+
// repartitioning and local sorting steps to meet distribution and ordering requirements.
1465+
// Therefore, it should run before EnforceDistribution and EnforceSorting.
14661466
Arc::new(JoinSelection::new()),
14671467
// If the query is processing infinite inputs, the PipelineFixer rule applies the
14681468
// necessary transformations to make the query runnable (if it is not already runnable).
14691469
// If the query can not be made runnable, the rule emits an error with a diagnostic message.
14701470
// Since the transformations it applies may alter output partitioning properties of operators
1471-
// (e.g. by swapping hash join sides), this rule runs before BasicEnforcement.
1471+
// (e.g. by swapping hash join sides), this rule runs before EnforceDistribution.
14721472
Arc::new(PipelineFixer::new()),
1473-
// BasicEnforcement is for adding essential repartition and local sorting operators
1474-
// to satisfy the required distribution and local sort requirements.
1475-
// Please make sure that the whole plan tree is determined.
1476-
Arc::new(BasicEnforcement::new()),
1477-
// The BasicEnforcement stage conservatively inserts sorts to satisfy ordering requirements.
1478-
// However, a deeper analysis may sometimes reveal that such a sort is actually unnecessary.
1479-
// These cases typically arise when we have reversible window expressions or deep subqueries.
1480-
// The rule below performs this analysis and removes unnecessary sorts.
1481-
Arc::new(OptimizeSorts::new()),
1473+
// The EnforceDistribution rule is for adding essential repartition to satisfy the required
1474+
// distribution. Please make sure that the whole plan tree is determined before this rule.
1475+
Arc::new(EnforceDistribution::new()),
1476+
// The EnforceSorting rule is for adding essential local sorting to satisfy the required
1477+
// ordering. Please make sure that the whole plan tree is determined before this rule.
1478+
// Note that one should always run this rule after running the EnforceDistribution rule
1479+
// as the latter may break local sorting requirements.
1480+
Arc::new(EnforceSorting::new()),
14821481
// The CoalesceBatches rule will not influence the distribution and ordering of the
14831482
// whole plan tree. Therefore, to avoid influencing other rules, it should run last.
14841483
Arc::new(CoalesceBatches::new()),

datafusion/core/src/physical_optimizer/enforcement.rs renamed to datafusion/core/src/physical_optimizer/dist_enforcement.rs

Lines changed: 30 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! Enforcement optimizer rules are used to make sure the plan's Distribution and Ordering
19-
//! requirements are met by inserting necessary [[RepartitionExec]] and [[SortExec]].
20-
//!
18+
//! EnforceDistribution optimizer rule inspects the physical plan with respect
19+
//! to distribution requirements and adds [RepartitionExec]s to satisfy them
20+
//! when necessary.
2121
use crate::config::ConfigOptions;
2222
use crate::error::Result;
23-
use crate::physical_optimizer::utils::{add_sort_above_child, ordering_satisfy};
2423
use crate::physical_optimizer::PhysicalOptimizerRule;
2524
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
2625
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -46,25 +45,25 @@ use datafusion_physical_expr::{
4645
use std::collections::HashMap;
4746
use std::sync::Arc;
4847

49-
/// BasicEnforcement rule, it ensures the Distribution and Ordering requirements are met
50-
/// in the strictest way. It might add additional [[RepartitionExec]] to the plan tree
48+
/// The EnforceDistribution rule ensures that distribution requirements are met
49+
/// in the strictest way. It might add additional [RepartitionExec] to the plan tree
5150
/// and give a non-optimal plan, but it can avoid the possible data skew in joins.
5251
///
5352
/// For example for a HashJoin with keys(a, b, c), the required Distribution(a, b, c) can be satisfied by
5453
/// several alternative partitioning ways: [(a, b, c), (a, b), (a, c), (b, c), (a), (b), (c), ( )].
5554
///
5655
/// This rule only chooses the exactly match and satisfies the Distribution(a, b, c) by a HashPartition(a, b, c).
5756
#[derive(Default)]
58-
pub struct BasicEnforcement {}
57+
pub struct EnforceDistribution {}
5958

60-
impl BasicEnforcement {
59+
impl EnforceDistribution {
6160
#[allow(missing_docs)]
6261
pub fn new() -> Self {
6362
Self {}
6463
}
6564
}
6665

67-
impl PhysicalOptimizerRule for BasicEnforcement {
66+
impl PhysicalOptimizerRule for EnforceDistribution {
6867
fn optimize(
6968
&self,
7069
plan: Arc<dyn ExecutionPlan>,
@@ -81,24 +80,21 @@ impl PhysicalOptimizerRule for BasicEnforcement {
8180
} else {
8281
plan
8382
};
84-
// Distribution and Ordering enforcement need to be applied bottom-up.
83+
// Distribution enforcement needs to be applied bottom-up.
8584
new_plan.transform_up(&{
8685
|plan| {
8786
let adjusted = if !top_down_join_key_reordering {
8887
reorder_join_keys_to_inputs(plan)?
8988
} else {
9089
plan
9190
};
92-
Ok(Some(ensure_distribution_and_ordering(
93-
adjusted,
94-
target_partitions,
95-
)?))
91+
Ok(Some(ensure_distribution(adjusted, target_partitions)?))
9692
}
9793
})
9894
}
9995

10096
fn name(&self) -> &str {
101-
"BasicEnforcement"
97+
"EnforceDistribution"
10298
}
10399

104100
fn schema_check(&self) -> bool {
@@ -829,10 +825,11 @@ fn new_join_conditions(
829825
new_join_on
830826
}
831827

832-
/// Within this function, it checks whether we need to add additional plan operators
833-
/// of data exchanging and data ordering to satisfy the required distribution and ordering.
834-
/// And we should avoid to manually add plan operators of data exchanging and data ordering in other places
835-
fn ensure_distribution_and_ordering(
828+
/// This function checks whether we need to add additional data exchange
829+
/// operators to satisfy distribution requirements. Since this function
830+
/// takes care of such requirements, we should avoid manually adding data
831+
/// exchange operators in other places.
832+
fn ensure_distribution(
836833
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
837834
target_partitions: usize,
838835
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
@@ -841,13 +838,11 @@ fn ensure_distribution_and_ordering(
841838
}
842839

843840
let required_input_distributions = plan.required_input_distribution();
844-
let required_input_orderings = plan.required_input_ordering();
845841
let children: Vec<Arc<dyn ExecutionPlan>> = plan.children();
846842
assert_eq!(children.len(), required_input_distributions.len());
847-
assert_eq!(children.len(), required_input_orderings.len());
848843

849844
// Add RepartitionExec to guarantee output partitioning
850-
let children = children
845+
let new_children: Result<Vec<Arc<dyn ExecutionPlan>>> = children
851846
.into_iter()
852847
.zip(required_input_distributions.into_iter())
853848
.map(|(child, required)| {
@@ -870,24 +865,8 @@ fn ensure_distribution_and_ordering(
870865
};
871866
new_child
872867
}
873-
});
874-
875-
// Add local SortExec to guarantee output ordering within each partition
876-
let new_children: Result<Vec<Arc<dyn ExecutionPlan>>> = children
877-
.zip(required_input_orderings.into_iter())
878-
.map(|(child_result, required)| {
879-
let child = child_result?;
880-
if ordering_satisfy(child.output_ordering(), required, || {
881-
child.equivalence_properties()
882-
}) {
883-
Ok(child)
884-
} else {
885-
let sort_expr = required.unwrap().to_vec();
886-
add_sort_above_child(&child, sort_expr)
887-
}
888868
})
889869
.collect();
890-
891870
with_new_children_if_necessary(plan, new_children?)
892871
}
893872

@@ -979,6 +958,7 @@ mod tests {
979958
use super::*;
980959
use crate::datasource::listing::PartitionedFile;
981960
use crate::datasource::object_store::ObjectStoreUrl;
961+
use crate::physical_optimizer::sort_enforcement::EnforceSorting;
982962
use crate::physical_plan::aggregates::{
983963
AggregateExec, AggregateMode, PhysicalGroupBy,
984964
};
@@ -1136,8 +1116,15 @@ mod tests {
11361116
config.execution.target_partitions = 10;
11371117

11381118
// run optimizer
1139-
let optimizer = BasicEnforcement {};
1119+
let optimizer = EnforceDistribution {};
11401120
let optimized = optimizer.optimize($PLAN, &config)?;
1121+
// NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade
1122+
// because they were written prior to the separation of `BasicEnforcement` into
1123+
// `EnforceSorting` and `EnfoceDistribution`.
1124+
// TODO: Orthogonalize the tests here just to verify `EnforceDistribution` and create
1125+
// new tests for the cascade.
1126+
let optimizer = EnforceSorting {};
1127+
let optimized = optimizer.optimize(optimized, &config)?;
11411128

11421129
// Now format correctly
11431130
let plan = displayable(optimized.as_ref()).indent().to_string();
@@ -1656,7 +1643,7 @@ mod tests {
16561643
Column::new_with_schema("c1", &right.schema()).unwrap(),
16571644
),
16581645
];
1659-
let bottom_left_join = ensure_distribution_and_ordering(
1646+
let bottom_left_join = ensure_distribution(
16601647
hash_join_exec(left.clone(), right.clone(), &join_on, &JoinType::Inner),
16611648
10,
16621649
)?;
@@ -1686,7 +1673,7 @@ mod tests {
16861673
Column::new_with_schema("a1", &right.schema()).unwrap(),
16871674
),
16881675
];
1689-
let bottom_right_join = ensure_distribution_and_ordering(
1676+
let bottom_right_join = ensure_distribution(
16901677
hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner),
16911678
10,
16921679
)?;
@@ -1775,7 +1762,7 @@ mod tests {
17751762
Column::new_with_schema("b1", &right.schema()).unwrap(),
17761763
),
17771764
];
1778-
let bottom_left_join = ensure_distribution_and_ordering(
1765+
let bottom_left_join = ensure_distribution(
17791766
hash_join_exec(left.clone(), right.clone(), &join_on, &JoinType::Inner),
17801767
10,
17811768
)?;
@@ -1805,7 +1792,7 @@ mod tests {
18051792
Column::new_with_schema("a1", &right.schema()).unwrap(),
18061793
),
18071794
];
1808-
let bottom_right_join = ensure_distribution_and_ordering(
1795+
let bottom_right_join = ensure_distribution(
18091796
hash_join_exec(left, right.clone(), &join_on, &JoinType::Inner),
18101797
10,
18111798
)?;

datafusion/core/src/physical_optimizer/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@
2020
2121
pub mod aggregate_statistics;
2222
pub mod coalesce_batches;
23-
pub mod enforcement;
23+
pub mod dist_enforcement;
2424
pub mod global_sort_selection;
2525
pub mod join_selection;
26-
pub mod optimize_sorts;
2726
pub mod optimizer;
2827
pub mod pipeline_checker;
2928
pub mod pruning;
3029
pub mod repartition;
30+
pub mod sort_enforcement;
3131
mod utils;
3232

3333
pub mod pipeline_fixer;

datafusion/core/src/physical_optimizer/repartition.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,8 @@ mod tests {
241241
use super::*;
242242
use crate::datasource::listing::PartitionedFile;
243243
use crate::datasource::object_store::ObjectStoreUrl;
244-
use crate::physical_optimizer::enforcement::BasicEnforcement;
244+
use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
245+
use crate::physical_optimizer::sort_enforcement::EnforceSorting;
245246
use crate::physical_plan::aggregates::{
246247
AggregateExec, AggregateMode, PhysicalGroupBy,
247248
};
@@ -370,9 +371,12 @@ mod tests {
370371
// run optimizer
371372
let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
372373
Arc::new(Repartition::new()),
373-
// The `BasicEnforcement` is an essential rule to be applied.
374+
// EnforceDistribution is an essential rule to be applied.
374375
// Otherwise, the correctness of the generated optimized plan cannot be guaranteed
375-
Arc::new(BasicEnforcement::new()),
376+
Arc::new(EnforceDistribution::new()),
377+
// EnforceSorting is an essential rule to be applied.
378+
// Otherwise, the correctness of the generated optimized plan cannot be guaranteed
379+
Arc::new(EnforceSorting::new()),
376380
];
377381
let optimized = optimizers.into_iter().fold($PLAN, |plan, optimizer| {
378382
optimizer.optimize(plan, &config).unwrap()

0 commit comments

Comments
 (0)